This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a3d7b2ac214 Dynamic PartitionFunction registry; unify int-to-id
mapping via PartitionIdNormalizer (#18446)
a3d7b2ac214 is described below
commit a3d7b2ac214331761b1af3735af09bc4393816e1
Author: Xiang Fu <[email protected]>
AuthorDate: Fri May 8 19:22:37 2026 -0700
Dynamic PartitionFunction registry; unify int-to-id mapping via
PartitionIdNormalizer (#18446)
* Make PartitionFunction registry dynamic; move impls to pinot-common
Replaces the closed `PartitionFunctionFactory` enum/switch with an
annotation-based registry so plug-in partition functions no longer
require touching segment-spi code.
- Add `@PartitionFunctionType(names = {...})` annotation in pinot-spi.
- Convert `PartitionFunctionFactory` to a classpath-scanning registry
(regex `.*\.partition\.function\..*`); the factory keeps the same
static API, callers are unchanged. Add `init()` mirroring
`FunctionRegistry.init()` and wire it from broker / server /
controller starters.
- Move the seven built-in impls (`Modulo`, `Murmur` / `Murmur2`,
`Murmur3`, `Fnv`, `HashCode`, `ByteArray`, `BoundedColumnValue`)
out of `pinot-segment-spi` into
`pinot-common/.../partition/function/`, standardize their
constructor on `(int numPartitions, Map<String,String> functionConfig)`,
and annotate each.
- Add `PartitionIntNormalizer` enum (`POSITIVE_MODULO` / `ABS` / `MASK`)
in pinot-segment-spi and a default `getPartitionIdNormalizer()` on
`PartitionFunction` so impls can declare which normalizer matches
their internal modulo semantics. Used by the framework only for
identity / staleness matching between config-side and segment-side
metadata; legacy impls still compute their own modulo. Javadoc spells
out the descriptive-only nature of the value for legacy functions.
- Tests: existing `PartitionFunctionTest` (19 cases) moved to
pinot-common; new `PartitionFunctionFactoryTest` covers
registration completeness, alias resolution (`Murmur` /
`Murmur2`), case-insensitive lookup, idempotent `init()`,
unknown-name rejection, and per-impl normalizer label;
new `PartitionIntNormalizerTest` covers per-normalizer math at
edge cases (`Integer.MIN_VALUE`, `Integer.MAX_VALUE`),
range invariant across all normalizers, and `fromConfigString`
round-trip / blank / unknown.
Plug-in path going forward: drop a class on the classpath under
`*.partition.function.*`, implement `PartitionFunction` with the
standard ctor, add `@PartitionFunctionType(names = "MyFn")` - the
registry picks it up at startup.
NOTE - backward-incompat: the seven impl classes' fully-qualified
names changed (`org.apache.pinot.segment.spi.partition.*` ->
`org.apache.pinot.common.partition.function.*`) and the no-Map
constructors on `Modulo` / `HashCode` / `ByteArray` are gone in favor
of the standard `(int, Map<String,String>)` form. `getName()` strings
(`Modulo`, `Murmur`, ...) are unchanged, so segment-on-disk metadata
is unaffected.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Drop FnvPartitionFunction inner enum; use PartitionIntNormalizer directly
Removes the duplicated `NegativePartitionHandling` inner enum in
`FnvPartitionFunction` and the `_negativePartitionHandling` field in
favor of a single `_normalizer` of type `PartitionIntNormalizer`. The
config key (`negativePartitionHandling`) and accepted values stay
unchanged for `mask` / `abs`; `positive_modulo` is now also
acceptable (strict superset). The error message for unknown values
now comes from `PartitionIntNormalizer.fromConfigString`.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Make getPartitionIdNormalizer() non-null on PartitionFunction
The interface default now returns `PartitionIntNormalizer.POSITIVE_MODULO`
instead of null, and `@Nullable` is dropped from the method. Plug-ins
that don't map onto a standard normalizer (e.g. `BoundedColumnValue`,
which already produces ids in `[0, N)`) inherit the safe default
without needing an explicit override.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Make getPartitionIdNormalizer() abstract; each impl declares its own
Removes the interface default so every PartitionFunction must explicitly
pick a normalizer. BoundedColumnValuePartitionFunction now overrides
with POSITIVE_MODULO (no-op label, since its output is already a fixed
mapping in [0, N)).
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Drive each partition function's int-to-id mapping via _normalizer field
Every built-in PartitionFunction now stores a `_normalizer` of type
PartitionIntNormalizer and applies it in `getPartition(...)` rather
than open-coding the modulo logic. Defaults match prior behavior:
| Impl | Default normalizer |
|-------------------|--------------------|
| Modulo | POSITIVE_MODULO |
| Murmur / Murmur2 | MASK |
| Murmur3 | MASK |
| Fnv | MASK |
| HashCode | KAFKA_ABS (new) |
| ByteArray | KAFKA_ABS (new) |
| BoundedColumnValue| POSITIVE_MODULO |
Adds `KAFKA_ABS` to PartitionIntNormalizer to cover the Kafka-style
`abs(hash) % N` (with `Integer.MIN_VALUE -> 0`) used by HashCode and
ByteArray. With KAFKA_ABS in place the normalizer is now an
authoritative driver of the partition-id computation, not just a
descriptive label - the interface Javadoc is updated accordingly.
Unifies the per-impl override path under a single config key
`partitionIdNormalizer` (case-insensitive), parsed via shared
`PartitionFunctionConfigs#normalizer`. Drops the FNV-specific
`negativePartitionHandling` key.
Tests:
- New `KAFKA_ABS` cases in PartitionIntNormalizerTest covering the
MIN_VALUE corner.
- New `testPartitionIdNormalizerConfigOverridesDefaultAcrossImpls`
in PartitionFunctionFactoryTest verifying the config rewires the
computed partition for HashCode, Modulo, and ByteArray.
- FNV tests updated to use `partitionIdNormalizer` config key.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Convert new Javadoc to JEP 467 markdown style
Switches all new Javadoc blocks added by this PR from `/** */` block
syntax to `///` markdown syntax (JEP 467), matching the convention
established in #18165 and the rest of the recently-touched
pinot-segment-spi files. Replaces `<p>`, `<ul>/<li>`, `<code>`,
`{@code X}`, `{@link X}` HTML/Javadoc tags with their markdown
equivalents (paragraph breaks, `-` lists, backticks, `[X]` refs).
License headers remain as `/** */` block comments.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Address PR review feedback
- Rename `PartitionIntNormalizer` -> `PartitionIdNormalizer` (Jackie #3).
- Rename `KAFKA_ABS` -> `PRE_MODULO_ABS` to make the pre-vs-post-modulo
distinction with `ABS` explicit (Jackie #4).
- Change `getPartitionIdNormalizer()` to return `PartitionIdNormalizer`
enum directly instead of `String`; drop `@JsonIgnore` so the field is
visible in serialized form (Jackie #2). Method stays abstract -- every
PartitionFunction implementation declares its own normalizer.
- Drop the redundant `_normalizer` field from
`BoundedColumnValuePartitionFunction`; it now returns
`PartitionIdNormalizer.POSITIVE_MODULO` directly since its output is
already a fixed mapping in `[0, numPartitions)` (Jackie #6).
- Make `@PartitionFunctionType` annotation optional. The factory now
scans every public, concrete `PartitionFunction` subtype under the
`org.apache.pinot.*` package tree. When a class lacks the annotation
(or `names()` is empty), the registry probes
`PartitionFunction.getName()` by instantiating with `(1, null)` and
registers under the returned name (Jackie #5). Annotation is now a
pure aliasing / overriding mechanism.
- Fix Javadoc on `@PartitionFunctionType` and `PartitionFunctionFactory`
to drop the incorrect "any plugin package" claim and the unimplemented
"stripping underscores" note (Copilot #1, #2, #4).
- Add `UnannotatedTestPartitionFunction` fixture + factory test that
exercises the no-annotation `getName()` fallback path.
- Update `PartitionFunctionTest` to expect 4 JSON fields (the new
`partitionIdNormalizer` field is no longer hidden).
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Trim and reject blank @PartitionFunctionType names; fall back to getName()
Validates each entry in `@PartitionFunctionType.names()` at registry
build time:
- Each entry is trimmed of surrounding whitespace.
- Blank entries are dropped silently.
- When ALL declared entries are blank (or the array is empty after
filtering), the registry logs a warning and falls back to probing
`PartitionFunction.getName()`, the same path used for unannotated
classes.
This prevents a misconfigured annotation (e.g. `names = {" "}`) from
silently registering a function under an empty canonical name, making
it undiscoverable at lookup time.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Drop @PartitionFunctionType annotation; drive registry from getNames()
default
The annotation added no value beyond what `PartitionFunction.getName()`
already returned for every built-in. Remove it entirely and let the
interface itself declare the registry contract:
- New default `List<String> getNames()` on `PartitionFunction` returns
`[getName()]`. The factory's static scan instantiates each subtype
with `(1, null)` and registers under whatever `getNames()` returns.
- Only `MurmurPartitionFunction` overrides `getNames()` (returns
`["Murmur", "Murmur2"]` so both aliases resolve to the same impl);
the other six built-ins use the default.
- `getNames()` is `@JsonIgnore`'d so it doesn't pollute the
serialized form (`testBasicProperties` JSON-shape assertion stays
at four fields).
- `BoundedColumnValuePartitionFunction`'s ctor now tolerates `null`
config (probe path) and defers validation; real-config use still
throws as before. `getPartition()` rejects probe-built instances.
- `@PartitionFunctionType` annotation file deleted.
- `UnannotatedTestPartitionFunction` fixture removed (no longer
meaningful now that every class is "unannotated").
- Added two new factory tests covering the default `getNames()`
behavior and Murmur's override.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Add NO_OP PartitionIdNormalizer for already-in-range outputs
`BoundedColumnValuePartitionFunction` produces a fixed mapping in
`[0, numPartitions)` by construction; the previous label was
`POSITIVE_MODULO` (a no-op for in-range values, but semantically
overloaded). Add an explicit `NO_OP` value that is the identity on
its inputs and use it for `BoundedColumnValue`. The framework does
not validate that callers actually pass in-range values to NO_OP —
out-of-range inputs yield out-of-range partition ids, by design.
`PartitionIdNormalizerTest#testRangeFoldingNormalizersReturnInRange`
(was `testAllNormalizersReturnInRange`) now skips NO_OP since the
range invariant doesn't apply. New `testNoOpIsIdentity` locks the
identity-with-narrowing semantics including the explicit
out-of-range pass-through.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../broker/broker/helix/BaseBrokerStarter.java | 4 +-
.../BoundedColumnValuePartitionFunction.java | 68 +++++---
.../function}/ByteArrayPartitionFunction.java | 37 +++--
.../partition/function}/FnvPartitionFunction.java | 80 ++-------
.../function}/HashCodePartitionFunction.java | 35 ++--
.../function}/ModuloPartitionFunction.java | 45 +++--
.../function}/Murmur3PartitionFunction.java | 35 ++--
.../function}/MurmurPartitionFunction.java | 41 +++--
.../function/PartitionFunctionConfigs.java | 49 ++++++
.../function/PartitionFunctionFactoryTest.java | 169 +++++++++++++++++++
.../partition/function}/PartitionFunctionTest.java | 21 ++-
.../pinot/controller/BaseControllerStarter.java | 7 +-
.../controller/utils/SegmentMetadataMockUtils.java | 4 +-
...gmentImplDropRecordOnPartitionMismatchTest.java | 12 +-
.../local/segment/index/ColumnMetadataTest.java | 2 +-
.../index/creator/SegmentPartitionTest.java | 2 +-
.../segment/spi/partition/PartitionFunction.java | 60 ++++---
.../spi/partition/PartitionFunctionFactory.java | 185 ++++++++++++++-------
.../spi/partition/PartitionIdNormalizer.java | 128 ++++++++++++++
.../spi/partition/PartitionIdNormalizerTest.java | 142 ++++++++++++++++
.../pinot/server/starter/ServerInstance.java | 4 +-
21 files changed, 845 insertions(+), 285 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 6d8acad99a3..c53501a6a8b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -101,6 +101,7 @@ import
org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import org.apache.pinot.query.routing.WorkerManager;
import
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
import
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -427,8 +428,9 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
LOGGER.info("Initializing ResultRewriterFactory");
ResultRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_RESULT_REWRITER_CLASS_NAMES));
- // Initialize FunctionRegistry before starting the broker request handler
+ // Initialize FunctionRegistry and PartitionFunctionFactory before
starting the broker request handler
FunctionRegistry.init();
+ PartitionFunctionFactory.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
_tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
similarity index 59%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
index a72beb34c82..aae532663a5 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java
@@ -16,29 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
-/**
- * Implementation of {@link PartitionFunction} which partitions based
configured column values.
- *
- * "columnPartitionMap": {
- * "subject": {
- * "functionName": "BoundedColumnValue",
- * "functionConfig": {
- * "columnValues": "Maths|English|Chemistry",
- * "columnValuesDelimiter": "|"
- * },
- * "numPartitions": 4
- * }
- * }
- * With this partition config on column "subject", partitionId would be 1 for
Maths, 2 for English and 3 for Chemistry.
- * partitionId would be "0" for all other values which may occur, therefore
'numPartitions' is set to 4.
- */
+/// Implementation of [PartitionFunction] which partitions based on configured
column values.
+///
+/// ```json
+/// "columnPartitionMap": {
+/// "subject": {
+/// "functionName": "BoundedColumnValue",
+/// "functionConfig": {
+/// "columnValues": "Maths|English|Chemistry",
+/// "columnValuesDelimiter": "|"
+/// },
+/// "numPartitions": 4
+/// }
+/// }
+/// ```
+///
+/// With this partition config on column "subject", `partitionId` is `1` for
Maths, `2` for English and `3` for
+/// Chemistry. `partitionId` is `0` for all other values which may occur,
therefore `numPartitions` is set to `4`.
public class BoundedColumnValuePartitionFunction implements PartitionFunction {
private static final int DEFAULT_PARTITION_ID = 0;
private static final String NAME = "BoundedColumnValue";
@@ -48,9 +52,17 @@ public class BoundedColumnValuePartitionFunction implements
PartitionFunction {
private final Map<String, String> _functionConfig;
private final String[] _values;
- public BoundedColumnValuePartitionFunction(int numPartitions, Map<String,
String> functionConfig) {
- Preconditions.checkArgument(functionConfig != null &&
functionConfig.size() > 0,
- "'functionConfig' should be present, specified", functionConfig);
+ public BoundedColumnValuePartitionFunction(int numPartitions, @Nullable
Map<String, String> functionConfig) {
+ _numPartitions = numPartitions;
+ if (functionConfig == null) {
+ // Probe-only path used by PartitionFunctionFactory startup scan; real
use supplies a
+ // populated config and reaches the validation below. getPartition()
will throw on a
+ // probe-built instance, which is fine because the registry never calls
it.
+ _functionConfig = null;
+ _values = null;
+ return;
+ }
+ Preconditions.checkArgument(functionConfig.size() > 0, "'functionConfig'
must not be empty");
Preconditions.checkState(functionConfig.get(COLUMN_VALUES) != null,
"columnValues must be configured");
Preconditions.checkState(functionConfig.get(COLUMN_VALUES_DELIMITER) !=
null,
"'columnValuesDelimiter' must be configured");
@@ -58,11 +70,11 @@ public class BoundedColumnValuePartitionFunction implements
PartitionFunction {
_values = StringUtils.split(functionConfig.get(COLUMN_VALUES),
functionConfig.get(COLUMN_VALUES_DELIMITER));
Preconditions.checkState(numPartitions == _values.length + 1,
"'numPartitions' must just be one greater than number of column values
configured");
- _numPartitions = numPartitions;
}
@Override
public int getPartition(String value) {
+ Preconditions.checkState(_values != null,
"BoundedColumnValuePartitionFunction is not configured");
for (int i = 0; i < _numPartitions - 1; i++) {
if (_values[i].equalsIgnoreCase(value)) {
return i + 1;
@@ -76,11 +88,9 @@ public class BoundedColumnValuePartitionFunction implements
PartitionFunction {
return NAME;
}
- /**
- * Returns number of partitions configured for the column.
- *
- * @return Total number of partitions for the function.
- */
+ /// Returns number of partitions configured for the column.
+ ///
+ /// @return Total number of partitions for the function.
@Override
public int getNumPartitions() {
return _numPartitions;
@@ -91,6 +101,12 @@ public class BoundedColumnValuePartitionFunction implements
PartitionFunction {
return _functionConfig;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ // Output is a fixed mapping in [0, numPartitions); no normalization is
applied.
+ return PartitionIdNormalizer.NO_OP;
+ }
+
@Override
public String toString() {
return getName();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
similarity index 57%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
index e8ff8edc245..90b1d030d86 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ByteArrayPartitionFunction.java
@@ -16,34 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
import static java.nio.charset.StandardCharsets.UTF_8;
-/**
- * Implementation of {@link Byte array partitioner}
- *
- */
+/// [PartitionFunction] that hashes the input via [Arrays#hashCode(byte\[\])]
of the value
+/// bytes and runs the configured [PartitionIdNormalizer] (default
+/// [PartitionIdNormalizer#PRE_MODULO_ABS], the Pre-modulo abs (Kafka-style)
`abs(hash) % N` that maps
+/// `Integer.MIN_VALUE -> 0`) to derive the partition id.
public class ByteArrayPartitionFunction implements PartitionFunction {
private static final String NAME = "ByteArray";
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.PRE_MODULO_ABS;
private final int _numPartitions;
+ private final PartitionIdNormalizer _normalizer;
- /**
- * Constructor for the class.
- * @param numPartitions Number of partitions
- */
- public ByteArrayPartitionFunction(int numPartitions) {
- Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, specified", numPartitions);
+ public ByteArrayPartitionFunction(int numPartitions, @Nullable Map<String,
String> functionConfig) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, was: %s", numPartitions);
_numPartitions = numPartitions;
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
@Override
public int getPartition(String value) {
- return abs(Arrays.hashCode(value.getBytes(UTF_8))) % _numPartitions;
+ return _normalizer.getPartitionId(Arrays.hashCode(value.getBytes(UTF_8)),
_numPartitions);
}
@Override
@@ -56,14 +59,14 @@ public class ByteArrayPartitionFunction implements
PartitionFunction {
return _numPartitions;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
return NAME;
}
-
- // NOTE: This matches the Utils.abs() in Kafka
- private static int abs(int n) {
- return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
- }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
similarity index 57%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
index 1f96a2ba645..83fe5a8d5c0 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/FnvPartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/FnvPartitionFunction.java
@@ -16,76 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
import java.util.Collections;
-import java.util.Locale;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.hash.FnvHashFunctions;
import static java.nio.charset.StandardCharsets.UTF_8;
-/**
- * Stateless and thread-safe {@link PartitionFunction} backed by configurable
FNV variants.
- */
+/// Stateless and thread-safe [PartitionFunction] backed by configurable FNV
variants. The
+/// configured [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK])
is applied
+/// to the raw FNV hash to derive the partition id.
public class FnvPartitionFunction implements PartitionFunction {
private static final String NAME = "FNV";
private static final String VARIANT_KEY = "variant";
private static final String USE_RAW_BYTES_KEY = "useRawBytes";
- private static final String NEGATIVE_PARTITION_HANDLING_KEY =
"negativePartitionHandling";
private static final FnvHashFunctions.Variant DEFAULT_VARIANT =
FnvHashFunctions.Variant.FNV1A_32;
- private static final NegativePartitionHandling
DEFAULT_NEGATIVE_PARTITION_HANDLING =
- NegativePartitionHandling.MASK;
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.MASK;
private final int _numPartitions;
@Nullable
private final Map<String, String> _functionConfig;
private final FnvHashFunctions.Variant _variant;
private final boolean _useRawBytes;
- private final NegativePartitionHandling _negativePartitionHandling;
-
- private enum NegativePartitionHandling {
- MASK,
- ABS;
-
- private static final String ALLOWED_HANDLINGS = "mask or abs";
-
- public static NegativePartitionHandling fromString(String value) {
- if (value == null) {
- throw invalidHandlingException(null);
- }
- try {
- return valueOf(value.trim().toUpperCase(Locale.ROOT));
- } catch (IllegalArgumentException e) {
- throw invalidHandlingException(value);
- }
- }
-
- public int getPartition(int hash, int numPartitions) {
- if (this == MASK) {
- return (hash & Integer.MAX_VALUE) % numPartitions;
- }
- int partition = hash % numPartitions;
- return partition < 0 ? -partition : partition;
- }
+ private final PartitionIdNormalizer _normalizer;
- public int getPartition(long hash, int numPartitions) {
- if (this == MASK) {
- return (int) ((hash & Long.MAX_VALUE) % numPartitions);
- }
- long partition = hash % numPartitions;
- return (int) (partition < 0 ? -partition : partition);
- }
- }
-
- /**
- * Builds a new FNV partition function from the provided configuration.
- */
public FnvPartitionFunction(int numPartitions, @Nullable Map<String, String>
functionConfig) {
Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0");
_numPartitions = numPartitions;
@@ -93,21 +55,16 @@ public class FnvPartitionFunction implements
PartitionFunction {
FnvHashFunctions.Variant variant = DEFAULT_VARIANT;
boolean useRawBytes = false;
- NegativePartitionHandling negativePartitionHandling =
DEFAULT_NEGATIVE_PARTITION_HANDLING;
if (functionConfig != null) {
String variantString = functionConfig.get(VARIANT_KEY);
if (StringUtils.isNotBlank(variantString)) {
variant = FnvHashFunctions.Variant.fromString(variantString);
}
useRawBytes =
Boolean.parseBoolean(functionConfig.get(USE_RAW_BYTES_KEY));
- String negativePartitionHandlingString =
functionConfig.get(NEGATIVE_PARTITION_HANDLING_KEY);
- if (StringUtils.isNotBlank(negativePartitionHandlingString)) {
- negativePartitionHandling =
NegativePartitionHandling.fromString(negativePartitionHandlingString);
- }
}
_variant = variant;
_useRawBytes = useRawBytes;
- _negativePartitionHandling = negativePartitionHandling;
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
@Override
@@ -116,12 +73,12 @@ public class FnvPartitionFunction implements
PartitionFunction {
if (_variant.is64Bit()) {
long hash = _variant == FnvHashFunctions.Variant.FNV1_64 ?
FnvHashFunctions.fnv1Hash64(bytes)
: FnvHashFunctions.fnv1aHash64(bytes);
- return _negativePartitionHandling.getPartition(hash, _numPartitions);
+ return _normalizer.getPartitionId(hash, _numPartitions);
}
int hash = _variant == FnvHashFunctions.Variant.FNV1_32 ?
FnvHashFunctions.fnv1Hash32(bytes)
: FnvHashFunctions.fnv1aHash32(bytes);
- return _negativePartitionHandling.getPartition(hash, _numPartitions);
+ return _normalizer.getPartitionId(hash, _numPartitions);
}
@Override
@@ -140,19 +97,14 @@ public class FnvPartitionFunction implements
PartitionFunction {
return _functionConfig;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
return NAME;
}
-
- private static IllegalArgumentException invalidHandlingException(@Nullable
String value) {
- return new IllegalArgumentException(
- "FNV negative partition handling must be " +
NegativePartitionHandling.ALLOWED_HANDLINGS + ", but was: "
- + formatValue(value));
- }
-
- private static String formatValue(@Nullable String value) {
- return value == null ? "null" : "'" + value + "'";
- }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
similarity index 56%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
index 182760cf44c..a6f5c5867d7 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/HashCodePartitionFunction.java
@@ -16,29 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
-/**
- * Hash code partition function, where:
- * <ul>
- * <li> partitionId = value.hashCode() % {@link #_numPartitions}</li>
- * </ul>
- */
+/// [PartitionFunction] that hashes the input via [String#hashCode()] and runs
the
+/// configured [PartitionIdNormalizer] (default
[PartitionIdNormalizer#PRE_MODULO_ABS], the
+/// Pre-modulo abs (Kafka-style) `abs(hash) % N` that maps `Integer.MIN_VALUE
-> 0`) to derive the
+/// partition id.
public class HashCodePartitionFunction implements PartitionFunction {
private static final String NAME = "HashCode";
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.PRE_MODULO_ABS;
private final int _numPartitions;
+ private final PartitionIdNormalizer _normalizer;
- public HashCodePartitionFunction(int numPartitions) {
- Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, specified", numPartitions);
+ public HashCodePartitionFunction(int numPartitions, @Nullable Map<String,
String> functionConfig) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, was: %s", numPartitions);
_numPartitions = numPartitions;
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
@Override
public int getPartition(String value) {
- return abs(value.hashCode()) % _numPartitions;
+ return _normalizer.getPartitionId(value.hashCode(), _numPartitions);
}
@Override
@@ -51,14 +56,14 @@ public class HashCodePartitionFunction implements
PartitionFunction {
return _numPartitions;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
return NAME;
}
-
- // NOTE: This matches the Utils.abs() in Kafka
- private static int abs(int n) {
- return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
- }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
similarity index 58%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
index a4b8eb49abc..8d34d855a75 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/ModuloPartitionFunction.java
@@ -16,41 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
-/**
- * Modulo operation based partition function, where:
- * <ul>
- * <li> partitionId = value % {@link #_numPartitions}</li>
- * </ul>
- *
- */
+/// Modulo operation based partition function. Treats the input string as a
base-10 long and runs
+/// the configured [PartitionIdNormalizer] (default
[PartitionIdNormalizer#POSITIVE_MODULO])
+/// over it.
public class ModuloPartitionFunction implements PartitionFunction {
private static final String NAME = "Modulo";
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.POSITIVE_MODULO;
private final int _numPartitions;
+ private final PartitionIdNormalizer _normalizer;
- /**
- * Constructor for the class.
- * @param numPartitions Number of partitions.
- */
- public ModuloPartitionFunction(int numPartitions) {
- Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, specified", numPartitions);
+ public ModuloPartitionFunction(int numPartitions, @Nullable Map<String,
String> functionConfig) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0, was: %s", numPartitions);
_numPartitions = numPartitions;
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
- /**
- * Returns partition id for a given value. Assumes that the passed in object
- * is either an Integer, or a string representation of an Integer.
- *
- * @param value Value for which to determine the partition id.
- * @return Partition id for the given value.
- */
@Override
public int getPartition(String value) {
- return toNonNegative((int) (Long.parseLong(value) % _numPartitions));
+ return _normalizer.getPartitionId(Long.parseLong(value), _numPartitions);
}
@Override
@@ -63,13 +55,14 @@ public class ModuloPartitionFunction implements
PartitionFunction {
return _numPartitions;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
return NAME;
}
-
- private int toNonNegative(int partition) {
- return partition < 0 ? partition + _numPartitions : partition;
- }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
similarity index 75%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
index 9a0110f810e..19b35c1aae3 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/Murmur3PartitionFunction.java
@@ -16,39 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import static java.nio.charset.StandardCharsets.UTF_8;
-/**
- * Implementation of {@link PartitionFunction} which partitions based on 32
bit murmur3 hash
- */
+/// [PartitionFunction] backed by a 32-bit Murmur3 hash. The configured
+/// [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK]) is applied
to the
+/// raw signed hash to derive the partition id.
public class Murmur3PartitionFunction implements PartitionFunction {
private static final String NAME = "Murmur3";
private static final String SEED_KEY = "seed";
private static final String VARIANT_KEY = "variant";
private static final String USE_RAW_BYTES_KEY = "useRawBytes";
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.MASK;
+
private final int _numPartitions;
@Nullable
private final Map<String, String> _functionConfig;
private final int _seed;
private final boolean _useX64;
private final boolean _useRawBytes;
+ private final PartitionIdNormalizer _normalizer;
- /**
- * Constructor for the class.
- * @param numPartitions Number of partitions.
- * @param functionConfig to extract configurations for the partition
function.
- */
public Murmur3PartitionFunction(int numPartitions, @Nullable Map<String,
String> functionConfig) {
Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0");
_numPartitions = numPartitions;
@@ -76,19 +76,21 @@ public class Murmur3PartitionFunction implements
PartitionFunction {
_seed = seed;
_useX64 = useX64;
_useRawBytes = useRawBytes;
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
@Override
public int getPartition(String value) {
+ int hash;
if (_useRawBytes) {
byte[] bytes = BytesUtils.toBytes(value);
- int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(bytes,
_seed)
+ hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(bytes, _seed)
: MurmurHashFunctions.murmurHash3X86Bit32(bytes, _seed);
- return (hash & Integer.MAX_VALUE) % _numPartitions;
+ } else {
+ hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
+ : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8),
_seed);
}
- int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
- : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8),
_seed);
- return (hash & Integer.MAX_VALUE) % _numPartitions;
+ return _normalizer.getPartitionId(hash, _numPartitions);
}
@Override
@@ -107,6 +109,11 @@ public class Murmur3PartitionFunction implements
PartitionFunction {
return _functionConfig;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
similarity index 69%
rename from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
index 6c64dee763a..5396fbe07cc 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/MurmurPartitionFunction.java
@@ -16,53 +16,48 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.google.common.base.Preconditions;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import static java.nio.charset.StandardCharsets.UTF_8;
-/**
- * Implementation of {@link PartitionFunction} which partitions based on 32
bit murmur hash
- */
+/// [PartitionFunction] backed by a 32-bit Murmur2 hash. The configured
+/// [PartitionIdNormalizer] (default [PartitionIdNormalizer#MASK]) is applied
to the
+/// raw signed hash to derive the partition id. Registered under both `Murmur`
and `Murmur2`.
public class MurmurPartitionFunction implements PartitionFunction {
private static final String NAME = "Murmur";
+ private static final List<String> NAMES = List.of("Murmur", "Murmur2");
private static final String USE_RAW_BYTES_KEY = "useRawBytes";
+ private static final PartitionIdNormalizer DEFAULT_NORMALIZER =
PartitionIdNormalizer.MASK;
+
private final int _numPartitions;
@Nullable
private final Map<String, String> _functionConfig;
private final boolean _useRawBytes;
+ private final PartitionIdNormalizer _normalizer;
- /**
- * Constructor for backward compatibility.
- * @param numPartitions Number of partitions.
- */
- public MurmurPartitionFunction(int numPartitions) {
- this(numPartitions, null);
- }
-
- /**
- * Constructor for the class.
- * @param numPartitions Number of partitions.
- * @param functionConfig to extract configurations for the partition
function.
- */
public MurmurPartitionFunction(int numPartitions, @Nullable Map<String,
String> functionConfig) {
Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0");
_numPartitions = numPartitions;
_functionConfig = functionConfig != null ?
Collections.unmodifiableMap(functionConfig) : null;
_useRawBytes = functionConfig != null &&
Boolean.parseBoolean(functionConfig.get(USE_RAW_BYTES_KEY));
+ _normalizer = PartitionFunctionConfigs.normalizer(functionConfig,
DEFAULT_NORMALIZER);
}
@Override
public int getPartition(String value) {
byte[] bytes = _useRawBytes ? BytesUtils.toBytes(value) :
value.getBytes(UTF_8);
- return (MurmurHashFunctions.murmurHash2(bytes) & Integer.MAX_VALUE) %
_numPartitions;
+ return _normalizer.getPartitionId(MurmurHashFunctions.murmurHash2(bytes),
_numPartitions);
}
@Override
@@ -70,6 +65,11 @@ public class MurmurPartitionFunction implements
PartitionFunction {
return NAME;
}
+ @Override
+ public List<String> getNames() {
+ return NAMES;
+ }
+
@Override
public int getNumPartitions() {
return _numPartitions;
@@ -81,6 +81,11 @@ public class MurmurPartitionFunction implements
PartitionFunction {
return _functionConfig;
}
+ @Override
+ public PartitionIdNormalizer getPartitionIdNormalizer() {
+ return _normalizer;
+ }
+
// Keep it for backward-compatibility, use getName() instead
@Override
public String toString() {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
new file mode 100644
index 00000000000..721025dd1d1
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/partition/function/PartitionFunctionConfigs.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition.function;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
+
+
+/// Shared config-parsing helpers for built-in partition functions.
+final class PartitionFunctionConfigs {
+ /// Config key under which an explicit [PartitionIdNormalizer] can be
selected.
+ static final String PARTITION_ID_NORMALIZER_KEY = "partitionIdNormalizer";
+
+ private PartitionFunctionConfigs() {
+ }
+
+ /// Reads [#PARTITION_ID_NORMALIZER_KEY] from the function config and
resolves it to a
+ /// [PartitionIdNormalizer]. Returns `defaultNormalizer` when the config is
absent or
+ /// the value is blank. Throws [IllegalArgumentException] on an unrecognized
value.
+ static PartitionIdNormalizer normalizer(@Nullable Map<String, String>
functionConfig,
+ PartitionIdNormalizer defaultNormalizer) {
+ if (functionConfig == null) {
+ return defaultNormalizer;
+ }
+ String raw = functionConfig.get(PARTITION_ID_NORMALIZER_KEY);
+ if (StringUtils.isBlank(raw)) {
+ return defaultNormalizer;
+ }
+ return PartitionIdNormalizer.fromConfigString(raw);
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
new file mode 100644
index 00000000000..85cae7e8730
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionFactoryTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.partition.function;
+
+import java.util.Map;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.PartitionIdNormalizer;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Coverage for [PartitionFunctionFactory] dynamic registry behavior.
+public class PartitionFunctionFactoryTest {
+
+ @Test
+ public void testAllBuiltInFunctionsRegistered() {
+ // Resolves every built-in name. The test fails if the classpath subtype
scan misses any impl
+ // or if its `getNames()` doesn't surface the expected canonical name.
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Modulo", 4, null)
instanceof ModuloPartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Murmur", 4, null)
instanceof MurmurPartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Murmur2", 4, null)
instanceof MurmurPartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Murmur3", 4, null)
instanceof Murmur3PartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Fnv", 4, null)
instanceof FnvPartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("HashCode", 4, null)
instanceof HashCodePartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("ByteArray", 4, null)
instanceof ByteArrayPartitionFunction);
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("BoundedColumnValue", 2,
+ Map.of("columnValues", "a", "columnValuesDelimiter", "|"))
+ instanceof BoundedColumnValuePartitionFunction);
+ }
+
+ @Test
+ public void testCaseInsensitiveLookup() {
+ // Names are matched after lower-casing. Both spellings resolve to the
same impl class.
+ PartitionFunction lowerCase =
PartitionFunctionFactory.getPartitionFunction("murmur3", 8, null);
+ PartitionFunction mixedCase =
PartitionFunctionFactory.getPartitionFunction("MuRmUr3", 8, null);
+ assertEquals(lowerCase.getClass(), Murmur3PartitionFunction.class);
+ assertEquals(mixedCase.getClass(), Murmur3PartitionFunction.class);
+ }
+
+ @Test
+ public void testMurmurAndMurmur2AliasResolveToSameClass() {
+ // MurmurPartitionFunction overrides getNames() to return ["Murmur",
"Murmur2"] so both
+ // names register against the same impl.
+ assertEquals(PartitionFunctionFactory.getPartitionFunction("Murmur", 4,
null).getClass(),
+ PartitionFunctionFactory.getPartitionFunction("Murmur2", 4,
null).getClass());
+ }
+
+ @Test
+ public void testGetNamesDefaultsToSingletonOfGetName() {
+ // The interface's default getNames() returns [getName()]. Verify a
non-overriding impl
+ // surfaces a single-entry list whose only entry equals getName().
+ Murmur3PartitionFunction fn = new Murmur3PartitionFunction(4, null);
+ assertEquals(fn.getNames(), java.util.List.of(fn.getName()));
+ }
+
+ @Test
+ public void testMurmurOverridesGetNamesWithTwoAliases() {
+ // MurmurPartitionFunction is the only built-in that overrides getNames();
verify the
+ // override surfaces both aliases.
+ MurmurPartitionFunction fn = new MurmurPartitionFunction(4, null);
+ assertEquals(fn.getNames(), java.util.List.of("Murmur", "Murmur2"));
+ }
+
+ @Test
+ public void testUnknownNameThrows() {
+ IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+ () -> PartitionFunctionFactory.getPartitionFunction("DoesNotExist", 4,
null));
+ assertTrue(e.getMessage().contains("DoesNotExist"));
+ }
+
+ @Test
+ public void testInitIsIdempotent() {
+ // Multiple components in the same JVM (e.g. controller + embedded broker
in a quickstart) call
+ // init() independently. Repeated calls must not blow up.
+ PartitionFunctionFactory.init();
+ PartitionFunctionFactory.init();
+ PartitionFunctionFactory.init();
+ assertTrue(
+ PartitionFunctionFactory.getPartitionFunction("Modulo", 4, null)
instanceof ModuloPartitionFunction);
+ }
+
+ @Test
+ public void testGetPartitionIdNormalizerPerImpl() {
+ // Locks the descriptive normalizer label that each built-in impl reports.
+ assertEquals(new ModuloPartitionFunction(4,
null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.POSITIVE_MODULO);
+ assertEquals(new MurmurPartitionFunction(4,
null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.MASK);
+ assertEquals(new Murmur3PartitionFunction(4,
null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.MASK);
+ assertEquals(new HashCodePartitionFunction(4,
null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.PRE_MODULO_ABS);
+ assertEquals(new ByteArrayPartitionFunction(4,
null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.PRE_MODULO_ABS);
+ // FNV defaults to MASK; any normalizer is selectable through the
partitionIdNormalizer config.
+ assertEquals(new FnvPartitionFunction(4, null).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.MASK);
+ assertEquals(new FnvPartitionFunction(4, Map.of("partitionIdNormalizer",
"abs")).getPartitionIdNormalizer(),
+ PartitionIdNormalizer.ABS);
+ // BoundedColumnValue's output is already in [0, N); reports NO_OP
(identity).
+ PartitionFunction boundedColumnValue = new
BoundedColumnValuePartitionFunction(2,
+ Map.of("columnValues", "a", "columnValuesDelimiter", "|"));
+ assertEquals(boundedColumnValue.getPartitionIdNormalizer(),
PartitionIdNormalizer.NO_OP);
+ }
+
+ @Test
+ public void testPartitionIdNormalizerConfigOverridesDefaultAcrossImpls() {
+ // Every impl exposes the same `partitionIdNormalizer` config key. Verify
that overriding the
+ // default rewires the actual partition-id computation (not just the
reported label).
+ Map<String, String> mask = Map.of("partitionIdNormalizer", "MASK");
+
+ // HashCode: configured normalizer drives the output. Pick a value whose
hashCode is negative
+ // (sweep until found) so PRE_MODULO_ABS vs MASK produces observably
different partition ids.
+ String negativeHashValue = null;
+ int negativeHash = 0;
+ for (int i = 0; i < 1000 && negativeHashValue == null; i++) {
+ String candidate = "value-" + i;
+ if (candidate.hashCode() < 0) {
+ negativeHashValue = candidate;
+ negativeHash = candidate.hashCode();
+ }
+ }
+ assertTrue(negativeHashValue != null, "Failed to find a string with a
negative hashCode in the search range");
+ assertEquals(new HashCodePartitionFunction(8,
null).getPartition(negativeHashValue),
+ PartitionIdNormalizer.PRE_MODULO_ABS.getPartitionId(negativeHash, 8));
+ assertEquals(new HashCodePartitionFunction(8,
mask).getPartition(negativeHashValue),
+ PartitionIdNormalizer.MASK.getPartitionId(negativeHash, 8));
+
+ // Modulo: explicit MASK on a negative input differs from the default
POSITIVE_MODULO output.
+ long signedValue = -10L;
+ int posMod = new ModuloPartitionFunction(7,
null).getPartition(Long.toString(signedValue));
+ int maskMod = new ModuloPartitionFunction(7,
mask).getPartition(Long.toString(signedValue));
+ assertEquals(posMod,
PartitionIdNormalizer.POSITIVE_MODULO.getPartitionId(signedValue, 7));
+ assertEquals(maskMod,
PartitionIdNormalizer.MASK.getPartitionId(signedValue, 7));
+
+ // ByteArray: PRE_MODULO_ABS default; verify the override label
round-trips on the SPI.
+ PartitionFunction byteArrayWithKafkaAbs = new ByteArrayPartitionFunction(4,
+ Map.of("partitionIdNormalizer", "PRE_MODULO_ABS"));
+ assertEquals(byteArrayWithKafkaAbs.getPartitionIdNormalizer(),
PartitionIdNormalizer.PRE_MODULO_ABS);
+ }
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
similarity index 98%
rename from
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
rename to
pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
index aa4537a23ac..3c852f7587c 100644
---
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/partition/function/PartitionFunctionTest.java
@@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.partition;
+package org.apache.pinot.common.partition.function;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.hash.FnvHashFunctions;
@@ -443,9 +445,10 @@ public class PartitionFunctionTest {
assertEquals(partitionFunction.getNumPartitions(), numPartitions);
JsonNode jsonNode = JsonUtils.objectToJsonNode(partitionFunction);
- assertEquals(jsonNode.size(), 3);
+ assertEquals(jsonNode.size(), 4);
assertEquals(jsonNode.get("name").asText().toLowerCase(),
functionName.toLowerCase());
assertEquals(jsonNode.get("numPartitions").asInt(), numPartitions);
+ assertTrue(jsonNode.has("partitionIdNormalizer"));
JsonNode functionConfigNode = jsonNode.get("functionConfig");
if (functionConfig == null) {
@@ -502,7 +505,7 @@ public class PartitionFunctionTest {
// initialized {@link MurmurPartitionFunction} with 5 partitions
int numPartitions = 5;
- MurmurPartitionFunction murmurPartitionFunction = new
MurmurPartitionFunction(numPartitions);
+ MurmurPartitionFunction murmurPartitionFunction = new
MurmurPartitionFunction(numPartitions, null);
// generate the same 10 String values
// Apply the partition function and compare with stored results
@@ -618,7 +621,7 @@ public class PartitionFunctionTest {
assertEquals(new FnvPartitionFunction(numPartitions,
null).getPartition(value), javaCompatiblePartition);
Map<String, String> functionConfig = new HashMap<>();
- functionConfig.put("negativePartitionHandling", "abs");
+ functionConfig.put("partitionIdNormalizer", "abs");
assertEquals(new FnvPartitionFunction(numPartitions,
functionConfig).getPartition(value), saramaCompatPartition);
}
@@ -636,19 +639,19 @@ public class PartitionFunctionTest {
functionConfig.put("variant", "fnv1a_64");
assertEquals(new FnvPartitionFunction(numPartitions,
functionConfig).getPartition(value), javaCompatiblePartition);
- functionConfig.put("negativePartitionHandling", "abs");
+ functionConfig.put("partitionIdNormalizer", "abs");
assertEquals(new FnvPartitionFunction(numPartitions,
functionConfig).getPartition(value), saramaCompatPartition);
}
@Test
public void
testFnvPartitionFunctionRejectsInvalidNegativePartitionHandling() {
Map<String, String> functionConfig = new HashMap<>();
- functionConfig.put("negativePartitionHandling", "saramaCompat");
+ functionConfig.put("partitionIdNormalizer", "saramaCompat");
IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class, () -> new
FnvPartitionFunction(4, functionConfig));
- assertEquals(exception.getMessage(),
- "FNV negative partition handling must be mask or abs, but was:
'saramaCompat'");
+ assertTrue(exception.getMessage().contains("saramaCompat"),
+ "Expected error to mention the offending value, was: " +
exception.getMessage());
}
@Test
@@ -768,7 +771,7 @@ public class PartitionFunctionTest {
// initialized {@link ByteArrayPartitionFunction} with 5 partitions
int numPartitions = 5;
- ByteArrayPartitionFunction byteArrayPartitionFunction = new
ByteArrayPartitionFunction(numPartitions);
+ ByteArrayPartitionFunction byteArrayPartitionFunction = new
ByteArrayPartitionFunction(numPartitions, null);
// generate the same 10 String values
// Apply the partition function and compare with stored results
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f266b490666..51807d8de87 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -145,6 +145,7 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableConfigValidatorRegistry;
@@ -287,9 +288,11 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_helixResourceManager = null;
_executorService = null;
} else {
- // Initialize FunctionRegistry before starting the admin application
(PinotQueryResource requires it to compile
- // queries)
+ // Initialize FunctionRegistry and PartitionFunctionFactory before
starting the admin application
+ // (PinotQueryResource requires the function registry to compile
queries; the partition factory must be
+ // populated before any segment metadata is read)
FunctionRegistry.init();
+ PartitionFunctionFactory.init();
_adminApp = createControllerAdminApp();
// This executor service is used to do async tasks from multiget util or
table rebalancing.
_executorService =
createExecutorService(_config.getControllerExecutorNumThreads(),
"async-task-thread-%d");
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index a434327e291..99b5b23b7db 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -23,10 +23,10 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.partition.function.MurmurPartitionFunction;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.partition.MurmurPartitionFunction;
import org.joda.time.Interval;
import org.mockito.Mockito;
@@ -89,7 +89,7 @@ public class SegmentMetadataMockUtils {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
Set<Integer> partitions = Collections.singleton(partitionNumber);
when(columnMetadata.getPartitions()).thenReturn(partitions);
- when(columnMetadata.getPartitionFunction()).thenReturn(new
MurmurPartitionFunction(5));
+ when(columnMetadata.getPartitionFunction()).thenReturn(new
MurmurPartitionFunction(5, null));
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
if (columnName != null) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
index 400788c1b2c..65f546dc961 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplDropRecordOnPartitionMismatchTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.local.indexsegment.mutable;
import java.io.IOException;
-import org.apache.pinot.segment.spi.partition.ModuloPartitionFunction;
+import org.apache.pinot.common.partition.function.ModuloPartitionFunction;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -57,7 +57,7 @@ public class
MutableSegmentImplDropRecordOnPartitionMismatchTest {
public void testRecordsMatchingPartitionAreIndexed()
throws IOException {
_segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
PARTITION_COLUMN,
- new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+ new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID,
true);
// memberId values 0, 4, 8 all map to partition 0
indexRow(_segment, 0, 100);
@@ -71,7 +71,7 @@ public class
MutableSegmentImplDropRecordOnPartitionMismatchTest {
public void testRecordsMismatchingPartitionAreDropped()
throws IOException {
_segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
PARTITION_COLUMN,
- new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+ new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID,
true);
// memberId values 1, 2, 3 map to partitions 1, 2, 3 — all mismatches
indexRow(_segment, 1, 200);
@@ -85,7 +85,7 @@ public class
MutableSegmentImplDropRecordOnPartitionMismatchTest {
public void testMixedPartitionsOnlyIndexMatchingRecords()
throws IOException {
_segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
PARTITION_COLUMN,
- new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+ new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID,
true);
indexRow(_segment, 0, 100); // partition 0 — indexed
indexRow(_segment, 1, 101); // partition 1 — dropped
@@ -99,7 +99,7 @@ public class
MutableSegmentImplDropRecordOnPartitionMismatchTest {
public void testConfigDisabledIndexesAllRecordsRegardlessOfPartition()
throws IOException {
_segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
PARTITION_COLUMN,
- new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, false);
+ new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID,
false);
indexRow(_segment, 0, 100); // partition 0
indexRow(_segment, 1, 101); // partition 1
@@ -113,7 +113,7 @@ public class
MutableSegmentImplDropRecordOnPartitionMismatchTest {
public void testNullPartitionColumnValueThrowsException()
throws IOException {
_segment = MutableSegmentImplTestUtils.createMutableSegmentImpl(SCHEMA,
PARTITION_COLUMN,
- new ModuloPartitionFunction(NUM_PARTITIONS), MAIN_PARTITION_ID, true);
+ new ModuloPartitionFunction(NUM_PARTITIONS, null), MAIN_PARTITION_ID,
true);
GenericRow row = new GenericRow();
row.putValue(PARTITION_COLUMN, null);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index b00c69b87b4..66c87da4d6e 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.common.partition.function.BoundedColumnValuePartitionFunction;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -42,7 +43,6 @@ import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import
org.apache.pinot.segment.spi.partition.BoundedColumnValuePartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
index 715e2af45b3..8f318e9b071 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.partition.function.ModuloPartitionFunction;
import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -36,7 +37,6 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.partition.ModuloPartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
index 3e6d26146d6..c563a4ee3d7 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
@@ -18,43 +18,59 @@
*/
package org.apache.pinot.segment.spi.partition;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-/**
- * Interface for partition function.
- *
- * Implementations of this interface are assumed not to be stateful.
- * That is, two invocations of {@code PartitionFunction.getPartition(value)}
- * with the same value are expected to produce the same result.
- */
+/// Interface for partition function.
+///
+/// Implementations of this interface are assumed not to be stateful. That is,
two invocations of
+/// `PartitionFunction.getPartition(value)` with the same value are expected
to produce the same
+/// result. Implementations must also be safe for concurrent invocation by
multiple threads.
public interface PartitionFunction extends Serializable {
- /**
- * Method to compute and return partition id for the given value.
- * NOTE: The value is expected to be a string representation of the actual
value.
- *
- * @param value Value for which to determine the partition id.
- * @return partition id for the value.
- */
+ /// Method to compute and return partition id for the given value.
+ /// NOTE: The value is expected to be a string representation of the actual
value.
+ ///
+ /// @param value Value for which to determine the partition id.
+ /// @return partition id for the value.
int getPartition(String value);
- /**
- * Returns the name of the partition function.
- * @return Name of the partition function.
- */
+ /// Returns the canonical name of the partition function.
+ ///
+ /// @return Name of the partition function.
String getName();
- /**
- * Returns the total number of possible partitions.
- * @return Number of possible partitions.
- */
+ /// Returns every name (canonical + aliases) under which this partition
function should be
+ /// registered with `PartitionFunctionFactory`. Defaults to a single-entry
list containing
+ /// [#getName()]. Override only when you want additional aliases — e.g.
+ /// `MurmurPartitionFunction` registers under both `Murmur` and `Murmur2`.
+ @JsonIgnore
+ default List<String> getNames() {
+ return Collections.singletonList(getName());
+ }
+
+ /// Returns the total number of possible partitions.
+ ///
+ /// @return Number of possible partitions.
int getNumPartitions();
@Nullable
default Map<String, String> getFunctionConfig() {
return null;
}
+
+ /// Reports the [PartitionIdNormalizer] that describes this partition
function's int-to-id
+ /// mapping. The framework uses this for identity / staleness matching
between config-side and
+ /// segment-side partition metadata, and (for the built-in implementations)
as the actual driver
+ /// of the int-to-id computation.
+ ///
+ /// Each implementation must declare its own value — there is intentionally
no default. Plug-ins
+ /// whose output is already in `[0, numPartitions)` (e.g. lookup-style
functions) should return
+ /// [PartitionIdNormalizer#POSITIVE_MODULO] (a no-op label).
+ PartitionIdNormalizer getPartitionIdNormalizer();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 43890a614c1..af25b5976d1 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -18,86 +18,147 @@
*/
package org.apache.pinot.segment.spi.partition;
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-
-
-/**
- * Factory to build instances of {@link PartitionFunction}.
- */
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Dynamic registry for [PartitionFunction] implementations.
+///
+/// Discovery walks every public, concrete [PartitionFunction] subtype on the
classpath under the
+/// `org.apache.pinot.*` package tree and registers each under the names
returned by
+/// [PartitionFunction#getNames()] (defaults to `[getName()]`; override to
declare aliases — e.g.
+/// `MurmurPartitionFunction` registers under both `Murmur` and `Murmur2`).
+///
+/// Each registrable class must be public, concrete, and expose a public
constructor with signature
+/// `(int numPartitions, Map<String, String> functionConfig)` — the
constructor used both for the
+/// startup `getNames()` probe (called with `(1, null)`) and for
[#getPartitionFunction(String, int,
+/// Map)] at lookup time.
+///
+/// The static block scans the classpath once and builds an immutable
(canonicalized name →
+/// constructor) map. To force eager initialization (e.g. so the scan happens
before the first
+/// segment is read), call [#init()] from broker / server / controller startup.
public class PartitionFunctionFactory {
- // Enum for various partition functions to be added.
- public enum PartitionFunctionType {
- Modulo, Murmur, Murmur2, Murmur3, Fnv, ByteArray, HashCode,
BoundedColumnValue;
- // Add more functions here.
+ private PartitionFunctionFactory() {
+ }
- private static final Map<String, PartitionFunctionType> VALUE_MAP = new
HashMap<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionFunctionFactory.class);
+ private static final String SCAN_PACKAGE = "org.apache.pinot";
- static {
- for (PartitionFunctionType functionType :
PartitionFunctionType.values()) {
- VALUE_MAP.put(functionType.name().toLowerCase(), functionType);
+ private static final Map<String, Constructor<? extends PartitionFunction>>
REGISTRY;
+
+ static {
+ long startTimeMs = System.currentTimeMillis();
+ Map<String, Constructor<? extends PartitionFunction>> registry = new
HashMap<>();
+ Set<Class<? extends PartitionFunction>> subtypes = scanSubtypes();
+ for (Class<? extends PartitionFunction> clazz : subtypes) {
+ int mods = clazz.getModifiers();
+ if (!Modifier.isPublic(mods) || Modifier.isAbstract(mods) ||
clazz.isInterface()) {
+ continue;
+ }
+ Constructor<? extends PartitionFunction> constructor;
+ try {
+ constructor = clazz.getConstructor(int.class, Map.class);
+ } catch (NoSuchMethodException e) {
+ LOGGER.warn("Skipping {}: missing public constructor (int, Map<String,
String>)", clazz.getName());
+ continue;
+ }
+ List<String> names = probeNames(clazz, constructor);
+ if (names == null) {
+ continue;
+ }
+ for (String name : names) {
+ if (name == null || name.trim().isEmpty()) {
+ LOGGER.warn("Skipping blank name for {}", clazz.getName());
+ continue;
+ }
+ String canonical = canonicalize(name.trim());
+ Constructor<? extends PartitionFunction> existing =
registry.put(canonical, constructor);
+ Preconditions.checkState(existing == null ||
existing.getDeclaringClass().equals(clazz),
+ "Partition function name '%s' is registered to both %s and %s",
name,
+ existing == null ? null : existing.getDeclaringClass().getName(),
clazz.getName());
}
}
+ REGISTRY = Collections.unmodifiableMap(registry);
+ LOGGER.info("Initialized PartitionFunctionFactory with {} functions: {} in
{}ms", REGISTRY.size(),
+ REGISTRY.keySet(), System.currentTimeMillis() - startTimeMs);
+ }
- public static PartitionFunctionType fromString(String name) {
- PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase());
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static Set<Class<? extends PartitionFunction>> scanSubtypes() {
+ final Set<?>[] result = new Set<?>[1];
+ PinotReflectionUtils.runWithLock(() ->
+ result[0] = new Reflections(new ConfigurationBuilder()
+ .setUrls(ClasspathHelper.forPackage(SCAN_PACKAGE))
+ .setScanners(new SubTypesScanner()))
+ .getSubTypesOf(PartitionFunction.class));
+ return (Set) result[0];
+ }
- if (functionType == null) {
- throw new IllegalArgumentException("No enum constant for: " + name);
+ /// Instantiates `clazz` with `(numPartitions = 1, functionConfig = null)`
and returns the
+ /// names list reported by [PartitionFunction#getNames()]. Returns `null`
(with a warning log)
+ /// when the probe construction fails — typically a function whose ctor
requires non-null config
+ /// (e.g. `BoundedColumnValuePartitionFunction`); such functions need to
either supply a usable
+ /// default config or skip auto-registration.
+ @Nullable
+ private static List<String> probeNames(Class<? extends PartitionFunction>
clazz,
+ Constructor<? extends PartitionFunction> constructor) {
+ try {
+ PartitionFunction probe = constructor.newInstance(1, null);
+ List<String> names = probe.getNames();
+ if (names == null || names.isEmpty()) {
+ LOGGER.warn("Skipping {}: getNames() returned null/empty",
clazz.getName());
+ return null;
}
- return functionType;
+ return names;
+ } catch (ReflectiveOperationException e) {
+ LOGGER.warn("Skipping {}: probing getNames() with (1, null) failed: {}",
clazz.getName(),
+ e.getCause() != null ? e.getCause().getMessage() : e.getMessage());
+ return null;
}
}
- /**
- * Private constructor so that the class cannot be instantiated.
- */
- private PartitionFunctionFactory() {
+ /// No-op call that exists to force the static initializer of this class to
run. Mirrors
+ /// `FunctionRegistry.init()` so callers can eagerly trigger classpath
scanning during
+ /// service startup instead of paying the cost on the first partition
function lookup.
+ public static void init() {
}
- /**
- * This method generates and returns a partition function based on the
provided string.
- *
- * @param functionName Name of partition function
- * @param numPartitions Number of partitions.
- * @param functionConfig The function configuration for given function.
- * @return Partition function
- */
- // TODO: introduce a way to inject custom partition function
- // a custom partition function could be used in the realtime stream
partitioning or offline segment partitioning.
- // The PartitionFunctionFactory should be able to support these default
implementations, as well as instantiate
- // based on config
+ /// Builds an instance of the partition function registered under
`functionName`.
+ ///
+ /// @param functionName matched case-insensitively
+ /// @param numPartitions positive partition count
+ /// @param functionConfig optional, function-specific configuration; may be
`null`
public static PartitionFunction getPartitionFunction(String functionName,
int numPartitions,
@Nullable Map<String, String> functionConfig) {
- PartitionFunctionType function =
PartitionFunctionType.fromString(functionName);
- switch (function) {
- case Modulo:
- return new ModuloPartitionFunction(numPartitions);
-
- case Murmur:
- case Murmur2:
- return new MurmurPartitionFunction(numPartitions, functionConfig);
-
- case Murmur3:
- return new Murmur3PartitionFunction(numPartitions, functionConfig);
-
- case Fnv:
- return new FnvPartitionFunction(numPartitions, functionConfig);
-
- case ByteArray:
- return new ByteArrayPartitionFunction(numPartitions);
-
- case HashCode:
- return new HashCodePartitionFunction(numPartitions);
-
- case BoundedColumnValue:
- return new BoundedColumnValuePartitionFunction(numPartitions,
functionConfig);
-
- default:
- throw new IllegalArgumentException("Illegal partition function name: "
+ functionName);
+ Constructor<? extends PartitionFunction> constructor =
REGISTRY.get(canonicalize(functionName));
+ Preconditions.checkArgument(constructor != null, "No partition function
registered for name: %s", functionName);
+ try {
+ return constructor.newInstance(numPartitions, functionConfig);
+ } catch (ReflectiveOperationException e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ }
+ throw new IllegalStateException(
+ "Failed to instantiate partition function '" + functionName + "'
with " + numPartitions + " partitions",
+ cause);
}
}
@@ -108,4 +169,8 @@ public class PartitionFunctionFactory {
public static PartitionFunction getPartitionFunction(ColumnPartitionMetadata
metadata) {
return getPartitionFunction(metadata.getFunctionName(),
metadata.getNumPartitions(), metadata.getFunctionConfig());
}
+
+ private static String canonicalize(String name) {
+ return name.toLowerCase(Locale.ROOT);
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
new file mode 100644
index 00000000000..8f5c4fed9a4
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+
+
+/// Maps a raw signed integer hash output to a non-negative partition id in
`[0, numPartitions)`.
+///
+/// [PartitionFunction] implementations apply the configured normalizer in
their
+/// `getPartition(...)` body and report it via
[PartitionFunction#getPartitionIdNormalizer()].
+/// The framework also uses the reported value for identity / staleness
matching between
+/// config-side and segment-side partition metadata.
+public enum PartitionIdNormalizer {
+ /// Compute the remainder, then shift negative remainders into the valid
range with `+ numPartitions`.
+ POSITIVE_MODULO {
+ @Override
+ int toPartitionId(int value, int numPartitions) {
+ int partition = value % numPartitions;
+ return partition < 0 ? partition + numPartitions : partition;
+ }
+
+ @Override
+ int toPartitionId(long value, int numPartitions) {
+ long partition = value % numPartitions;
+ return (int) (partition < 0 ? partition + numPartitions : partition);
+ }
+ },
+ /// Compute the remainder, then take its absolute value.
+ ABS {
+ @Override
+ int toPartitionId(int value, int numPartitions) {
+ int partition = value % numPartitions;
+ return partition < 0 ? -partition : partition;
+ }
+
+ @Override
+ int toPartitionId(long value, int numPartitions) {
+ long partition = value % numPartitions;
+ return (int) (partition < 0 ? -partition : partition);
+ }
+ },
+ /// Mask the sign bit before applying modulo.
+ MASK {
+ @Override
+ int toPartitionId(int value, int numPartitions) {
+ return (value & Integer.MAX_VALUE) % numPartitions;
+ }
+
+ @Override
+ int toPartitionId(long value, int numPartitions) {
+ return (int) ((value & Long.MAX_VALUE) % numPartitions);
+ }
+ },
+ /// Pre-modulo abs (Kafka-style) `abs(value) % numPartitions` that handles
`Integer.MIN_VALUE -> 0`
+ /// (and `Long.MIN_VALUE -> 0`) to avoid the `Math.abs` overflow corner.
Matches the
+ /// legacy semantics of `HashCodePartitionFunction` and
`ByteArrayPartitionFunction`.
+ PRE_MODULO_ABS {
+ @Override
+ int toPartitionId(int value, int numPartitions) {
+ int abs = (value == Integer.MIN_VALUE) ? 0 : Math.abs(value);
+ return abs % numPartitions;
+ }
+
+ @Override
+ int toPartitionId(long value, int numPartitions) {
+ long abs = (value == Long.MIN_VALUE) ? 0L : Math.abs(value);
+ return (int) (abs % numPartitions);
+ }
+ },
+ /// Identity. Returns the input unchanged (narrowed to `int` for the long
overload). Use only
+ /// when the upstream `PartitionFunction#getPartition` value is already
guaranteed to be in
+ /// `[0, numPartitions)` — e.g. lookup-style functions like
`BoundedColumnValuePartitionFunction`.
+ /// The framework does NOT validate that the input is in range; passing an
out-of-range value
+ /// yields an out-of-range partition id.
+ NO_OP {
+ @Override
+ int toPartitionId(int value, int numPartitions) {
+ return value;
+ }
+
+ @Override
+ int toPartitionId(long value, int numPartitions) {
+ return (int) value;
+ }
+ };
+
+ public final int getPartitionId(int value, int numPartitions) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0");
+ return toPartitionId(value, numPartitions);
+ }
+
+ public final int getPartitionId(long value, int numPartitions) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must
be > 0");
+ return toPartitionId(value, numPartitions);
+ }
+
+ public static PartitionIdNormalizer fromConfigString(String
partitionIdNormalizer) {
+ Preconditions.checkArgument(partitionIdNormalizer != null &&
!partitionIdNormalizer.trim().isEmpty(),
+ "'partitionIdNormalizer' must not be blank");
+ try {
+ return valueOf(partitionIdNormalizer.trim().toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unsupported partitionIdNormalizer: "
+ partitionIdNormalizer, e);
+ }
+ }
+
+ abstract int toPartitionId(int value, int numPartitions);
+
+ abstract int toPartitionId(long value, int numPartitions);
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
new file mode 100644
index 00000000000..b93af8b93a9
--- /dev/null
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionIdNormalizerTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+public class PartitionIdNormalizerTest {
+
+ @Test
+ public void testPositiveModulo() {
+ PartitionIdNormalizer n = PartitionIdNormalizer.POSITIVE_MODULO;
+ assertEquals(n.getPartitionId(0, 7), 0);
+ assertEquals(n.getPartitionId(10, 7), 3);
+ assertEquals(n.getPartitionId(-1, 7), 6);
+ assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7),
shiftedMod(Integer.MIN_VALUE, 7));
+ assertEquals(n.getPartitionId(Integer.MAX_VALUE, 1024), Integer.MAX_VALUE
% 1024);
+ assertEquals(n.getPartitionId(7L, 7), 0);
+ assertEquals(n.getPartitionId(Long.MIN_VALUE, 7),
shiftedMod(Long.MIN_VALUE, 7));
+ }
+
+ @Test
+ public void testAbs() {
+ PartitionIdNormalizer n = PartitionIdNormalizer.ABS;
+ assertEquals(n.getPartitionId(10, 7), 3);
+ assertEquals(n.getPartitionId(-10, 7), 3);
+ assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7),
Math.abs(Integer.MIN_VALUE % 7));
+ assertEquals(n.getPartitionId(-10L, 7), 3);
+ }
+
+ @Test
+ public void testMask() {
+ PartitionIdNormalizer n = PartitionIdNormalizer.MASK;
+ assertEquals(n.getPartitionId(0, 7), 0);
+ assertEquals(n.getPartitionId(-1, 7), Integer.MAX_VALUE % 7);
+ assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 0);
+ assertEquals(n.getPartitionId(Long.MIN_VALUE, 7), 0);
+ }
+
+ @Test
+ public void testPreModuloAbs() {
+ PartitionIdNormalizer n = PartitionIdNormalizer.PRE_MODULO_ABS;
+ // Plain positive / negative: matches abs(hash) % N.
+ assertEquals(n.getPartitionId(10, 7), 10 % 7);
+ assertEquals(n.getPartitionId(-10, 7), 10 % 7);
+ // The defining corner case: Math.abs(MIN_VALUE) overflows back to
MIN_VALUE, so PRE_MODULO_ABS
+ // patches that to 0 before the modulo.
+ assertEquals(n.getPartitionId(Integer.MIN_VALUE, 7), 0);
+ assertEquals(n.getPartitionId(Long.MIN_VALUE, 7), 0);
+ // For non-MIN_VALUE inputs, PRE_MODULO_ABS and (Math.abs(value) % N)
agree.
+ assertEquals(n.getPartitionId(Integer.MAX_VALUE, 1024), Integer.MAX_VALUE
% 1024);
+ assertEquals(n.getPartitionId(-1234L, 17), 1234 % 17);
+ }
+
+ @Test
+ public void testNoOpIsIdentity() {
+ PartitionIdNormalizer n = PartitionIdNormalizer.NO_OP;
+ // Identity for every input — the caller is asserting the value is already
in [0, N).
+ assertEquals(n.getPartitionId(0, 7), 0);
+ assertEquals(n.getPartitionId(3, 7), 3);
+ // The framework does NOT validate the input is in range; passing
out-of-range is the
+ // caller's responsibility.
+ assertEquals(n.getPartitionId(-5, 7), -5);
+ assertEquals(n.getPartitionId(99, 7), 99);
+ // long overload narrows to int.
+ assertEquals(n.getPartitionId(42L, 7), 42);
+ assertEquals(n.getPartitionId((long) Integer.MAX_VALUE, 1024),
Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testRangeFoldingNormalizersReturnInRange() {
+ int[] partitionCounts = {1, 2, 7, 1024};
+ int[] hashes = {0, 1, -1, 7, -7, Integer.MIN_VALUE, Integer.MAX_VALUE, 13,
-13};
+ // NO_OP is excluded — it's identity and only safe when the caller already
produced an
+ // in-range value. The other four normalizers fold every signed input into
[0, N).
+ for (PartitionIdNormalizer n : PartitionIdNormalizer.values()) {
+ if (n == PartitionIdNormalizer.NO_OP) {
+ continue;
+ }
+ for (int p : partitionCounts) {
+ for (int h : hashes) {
+ int pid = n.getPartitionId(h, p);
+ assertTrue(pid >= 0 && pid < p,
+ n + " produced out-of-range partition " + pid + " for hash=" + h
+ ", N=" + p);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testFromConfigStringRoundTrip() {
+ for (PartitionIdNormalizer n : PartitionIdNormalizer.values()) {
+ assertEquals(PartitionIdNormalizer.fromConfigString(n.name()), n);
+
assertEquals(PartitionIdNormalizer.fromConfigString(n.name().toLowerCase()), n);
+ assertEquals(PartitionIdNormalizer.fromConfigString(" " + n.name() + "
"), n);
+ }
+ }
+
+ @Test
+ public void testFromConfigStringRejectsBlank() {
+ expectThrows(IllegalArgumentException.class, () ->
PartitionIdNormalizer.fromConfigString(null));
+ expectThrows(IllegalArgumentException.class, () ->
PartitionIdNormalizer.fromConfigString(""));
+ expectThrows(IllegalArgumentException.class, () ->
PartitionIdNormalizer.fromConfigString(" "));
+ }
+
+ @Test
+ public void testFromConfigStringRejectsUnknown() {
+ IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+ () -> PartitionIdNormalizer.fromConfigString("not-a-real-normalizer"));
+ assertTrue(e.getMessage().contains("not-a-real-normalizer"));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testNonPositivePartitionsRejected() {
+ PartitionIdNormalizer.POSITIVE_MODULO.getPartitionId(5, 0);
+ }
+
+ private static int shiftedMod(long value, int numPartitions) {
+ long m = value % numPartitions;
+ return (int) (m < 0 ? m + numPartitions : m);
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 3e66c49b61e..9bc1da9edc7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -46,6 +46,7 @@ import
org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
import org.apache.pinot.query.runtime.SendStatsPredicate;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.server.access.AccessControlFactory;
import org.apache.pinot.server.access.AllowAllAccessFactory;
@@ -107,10 +108,11 @@ public class ServerInstance {
_instanceDataManager.init(serverConf.getInstanceDataManagerConfig(),
helixManager, _serverMetrics,
segmentOperationsThrottlerSet, _reloadJobStatusCache);
- // Initialize ServerQueryLogger and FunctionRegistry before starting the
query executor
+ // Initialize ServerQueryLogger, FunctionRegistry and
PartitionFunctionFactory before starting the query executor
ServerQueryLogger.init(serverConf.getQueryLogMaxRate(),
serverConf.getQueryLogDroppedReportMaxRate(),
_serverMetrics);
FunctionRegistry.init();
+ PartitionFunctionFactory.init();
String queryExecutorClassName = serverConf.getQueryExecutorClassName();
LOGGER.info("Initializing query executor of class: {}",
queryExecutorClassName);
_queryExecutor =
PluginManager.get().createInstance(queryExecutorClassName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]