This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ac21483dd [common] Support Protobuf and RPC for Predicates (#2502)
ac21483dd is described below
commit ac21483dd7b9f498ea839e459c952b8bf3a2076a
Author: Giannis Polyzos <[email protected]>
AuthorDate: Mon Mar 16 07:56:04 2026 +0100
[common] Support Protobuf and RPC for Predicates (#2502)
---
.../fluss/rpc/util/PredicateMessageUtils.java | 572 +++++++++++++++
fluss-rpc/src/main/proto/FlussApi.proto | 66 ++
.../fluss/rpc/util/PredicateMessageUtilsTest.java | 781 +++++++++++++++++++++
3 files changed, 1419 insertions(+)
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java
new file mode 100644
index 000000000..3eb138e6f
--- /dev/null
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.util;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.Contains;
+import org.apache.fluss.predicate.EndsWith;
+import org.apache.fluss.predicate.Equal;
+import org.apache.fluss.predicate.GreaterOrEqual;
+import org.apache.fluss.predicate.GreaterThan;
+import org.apache.fluss.predicate.In;
+import org.apache.fluss.predicate.IsNotNull;
+import org.apache.fluss.predicate.IsNull;
+import org.apache.fluss.predicate.LeafFunction;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.LessOrEqual;
+import org.apache.fluss.predicate.LessThan;
+import org.apache.fluss.predicate.NotEqual;
+import org.apache.fluss.predicate.NotIn;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+import org.apache.fluss.predicate.StartsWith;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.rpc.messages.PbCompoundPredicate;
+import org.apache.fluss.rpc.messages.PbLeafPredicate;
+import org.apache.fluss.rpc.messages.PbLiteralValue;
+import org.apache.fluss.rpc.messages.PbPredicate;
+import org.apache.fluss.types.DataField;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utils for converting Predicate to PbPredicate and vice versa. */
+public class PredicateMessageUtils {
+
+ //
-------------------------------------------------------------------------
+ // Deserialization: PbPredicate -> Predicate
+ //
-------------------------------------------------------------------------
+
+ public static Predicate toPredicate(PbPredicate pbPredicate, RowType
rowType) {
+ PredicateType type = PredicateType.fromValue(pbPredicate.getType());
+ switch (type) {
+ case LEAF:
+ return toLeafPredicate(pbPredicate.getLeaf(), rowType);
+ case COMPOUND:
+ return toCompoundPredicate(pbPredicate.getCompound(), rowType);
+ default:
+ throw new IllegalArgumentException("Unknown predicate type: "
+ type);
+ }
+ }
+
+ public static CompoundPredicate toCompoundPredicate(
+ PbCompoundPredicate pbCompound, RowType rowType) {
+ List<Predicate> children =
+ pbCompound.getChildrensList().stream()
+ .map(child -> toPredicate(child, rowType))
+ .collect(Collectors.toList());
+ return new CompoundPredicate(
+
CompoundFunctionCode.fromValue(pbCompound.getFunction()).getFunction(),
children);
+ }
+
+ private static LeafPredicate toLeafPredicate(PbLeafPredicate pbLeaf,
RowType rowType) {
+ ResolvedField resolvedField = resolveField(rowType,
pbLeaf.getFieldId());
+ DataType fieldType = resolvedField.field.getType();
+ String fieldName = resolvedField.field.getName();
+ List<Object> literals =
+ pbLeaf.getLiteralsList().stream()
+ .map(lit -> toLiteralValue(lit, fieldType))
+ .collect(Collectors.toList());
+
+ return new LeafPredicate(
+ LeafFunctionCode.fromValue(pbLeaf.getFunction()).getFunction(),
+ fieldType,
+ resolvedField.index,
+ fieldName,
+ literals);
+ }
+
+ private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType
fieldType) {
+ if (pbLiteral.isIsNull()) {
+ return null;
+ }
+ DataTypeRoot root =
+
DataTypeRootCode.fromValue(pbLiteral.getLiteralType()).getDataTypeRoot();
+ validateLiteralType(fieldType, root);
+ switch (root) {
+ case BOOLEAN:
+ return pbLiteral.isBooleanValue();
+ case TINYINT:
+ return (byte) pbLiteral.getIntValue();
+ case SMALLINT:
+ return (short) pbLiteral.getIntValue();
+ case INTEGER:
+ return pbLiteral.getIntValue();
+ case BIGINT:
+ return pbLiteral.getBigintValue();
+ case FLOAT:
+ return pbLiteral.getFloatValue();
+ case DOUBLE:
+ return pbLiteral.getDoubleValue();
+ case CHAR:
+ case STRING:
+ String stringValue = pbLiteral.getStringValue();
+ return stringValue == null ? null :
BinaryString.fromString(stringValue);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) fieldType;
+ if (pbLiteral.hasDecimalBytes()) {
+ return Decimal.fromUnscaledBytes(
+ pbLiteral.getDecimalBytes(),
+ decimalType.getPrecision(),
+ decimalType.getScale());
+ } else {
+ return Decimal.fromUnscaledLong(
+ pbLiteral.getDecimalValue(),
+ decimalType.getPrecision(),
+ decimalType.getScale());
+ }
+ case DATE:
+ return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
+ case TIME_WITHOUT_TIME_ZONE:
+ return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() *
1_000_000L);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TimestampNtz.fromMillis(
+ pbLiteral.getTimestampMillisValue(),
+ pbLiteral.getTimestampNanoOfMillisValue());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TimestampLtz.fromEpochMillis(
+ pbLiteral.getTimestampMillisValue(),
+ pbLiteral.getTimestampNanoOfMillisValue());
+ case BINARY:
+ case BYTES:
+ return pbLiteral.getBinaryValue();
+ default:
+ throw new IllegalArgumentException("Unknown literal value
type: " + root);
+ }
+ }
+
+ private static void validateLiteralType(DataType fieldType, DataTypeRoot
literalRoot) {
+ DataTypeRoot fieldRoot = fieldType.getTypeRoot();
+ if (fieldRoot == literalRoot) {
+ return;
+ }
+ if (isCharacterString(fieldRoot) && isCharacterString(literalRoot)) {
+ return;
+ }
+ if (isBinaryString(fieldRoot) && isBinaryString(literalRoot)) {
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Literal type "
+ + literalRoot
+ + " does not match target schema field type "
+ + fieldRoot
+ + ".");
+ }
+
+ private static boolean isCharacterString(DataTypeRoot root) {
+ return root == DataTypeRoot.CHAR || root == DataTypeRoot.STRING;
+ }
+
+ private static boolean isBinaryString(DataTypeRoot root) {
+ return root == DataTypeRoot.BINARY || root == DataTypeRoot.BYTES;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Serialization: Predicate -> PbPredicate
+ //
-------------------------------------------------------------------------
+
+ public static PbPredicate toPbPredicate(Predicate predicate, RowType
rowType) {
+ return predicate.visit(
+ new PredicateVisitor<PbPredicate>() {
+ @Override
+ public PbPredicate visit(LeafPredicate predicate) {
+ DataField field = resolveSourceField(predicate,
rowType);
+ if (field.getFieldId() < 0) {
+ throw new IllegalArgumentException(
+ "Field "
+ + field.getName()
+ + " at index "
+ + predicate.index()
+ + " does not have a valid field
id.");
+ }
+ PbLeafPredicate pbLeaf = new PbLeafPredicate();
+ pbLeaf.setFunction(
+
LeafFunctionCode.fromFunction(predicate.function()).getValue());
+ pbLeaf.setFieldId(field.getFieldId());
+
+ List<PbLiteralValue> literals = new ArrayList<>();
+ for (Object literal : predicate.literals()) {
+ literals.add(toPbLiteralValue(field.getType(),
literal));
+ }
+ pbLeaf.addAllLiterals(literals);
+
+ PbPredicate pbPredicate = new PbPredicate();
+ pbPredicate.setType(PredicateType.LEAF.getValue());
+ pbPredicate.setLeaf(pbLeaf);
+ return pbPredicate;
+ }
+
+ @Override
+ public PbPredicate visit(CompoundPredicate predicate) {
+ PbCompoundPredicate pbCompound = new
PbCompoundPredicate();
+ pbCompound.setFunction(
+
CompoundFunctionCode.fromFunction(predicate.function()).getValue());
+ pbCompound.addAllChildrens(
+ predicate.children().stream()
+ .map(child -> toPbPredicate(child,
rowType))
+ .collect(Collectors.toList()));
+
+ PbPredicate pbPredicate = new PbPredicate();
+ pbPredicate.setType(PredicateType.COMPOUND.getValue());
+ pbPredicate.setCompound(pbCompound);
+ return pbPredicate;
+ }
+ });
+ }
+
+ private static PbLiteralValue toPbLiteralValue(DataType type, Object
literal) {
+ PbLiteralValue pbLiteral = new PbLiteralValue();
+
pbLiteral.setLiteralType(DataTypeRootCode.fromDataTypeRoot(type.getTypeRoot()).getValue());
+ if (literal == null) {
+ pbLiteral.setIsNull(true);
+ return pbLiteral;
+ }
+ pbLiteral.setIsNull(false);
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case STRING:
+ pbLiteral.setStringValue(literal.toString());
+ break;
+ case BOOLEAN:
+ pbLiteral.setBooleanValue((Boolean) literal);
+ break;
+ case BINARY:
+ case BYTES:
+ pbLiteral.setBinaryValue((byte[]) literal);
+ break;
+ case DECIMAL:
+ Decimal decimal = (Decimal) literal;
+ if (decimal.isCompact()) {
+ pbLiteral.setDecimalValue(decimal.toUnscaledLong());
+ } else {
+ pbLiteral.setDecimalBytes(decimal.toUnscaledBytes());
+ }
+ break;
+ case TINYINT:
+ pbLiteral.setIntValue((Byte) literal);
+ break;
+ case SMALLINT:
+ pbLiteral.setIntValue((Short) literal);
+ break;
+ case INTEGER:
+ pbLiteral.setIntValue((Integer) literal);
+ break;
+ case DATE:
+ pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ pbLiteral.setIntValue((int) (((LocalTime)
literal).toNanoOfDay() / 1_000_000L));
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ pbLiteral.setTimestampMillisValue(((TimestampNtz)
literal).getMillisecond());
+ pbLiteral.setTimestampNanoOfMillisValue(
+ ((TimestampNtz) literal).getNanoOfMillisecond());
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ pbLiteral.setTimestampMillisValue(((TimestampLtz)
literal).getEpochMillisecond());
+ pbLiteral.setTimestampNanoOfMillisValue(
+ ((TimestampLtz) literal).getNanoOfMillisecond());
+ break;
+ case BIGINT:
+ pbLiteral.setBigintValue((Long) literal);
+ break;
+ case FLOAT:
+ pbLiteral.setFloatValue((Float) literal);
+ break;
+ case DOUBLE:
+ pbLiteral.setDoubleValue((Double) literal);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data type: " +
type.getTypeRoot());
+ }
+ return pbLiteral;
+ }
+
+ private static DataField resolveSourceField(LeafPredicate predicate,
RowType rowType) {
+ if (predicate.index() < 0 || predicate.index() >=
rowType.getFieldCount()) {
+ throw new IllegalArgumentException(
+ "Predicate field index "
+ + predicate.index()
+ + " is out of bounds for row type with "
+ + rowType.getFieldCount()
+ + " fields.");
+ }
+ DataField field = rowType.getFields().get(predicate.index());
+ if (!field.getName().equals(predicate.fieldName())) {
+ throw new IllegalArgumentException(
+ "Predicate field name "
+ + predicate.fieldName()
+ + " does not match schema field "
+ + field.getName()
+ + " at index "
+ + predicate.index()
+ + ".");
+ }
+ if (!field.getType().equals(predicate.type())) {
+ throw new IllegalArgumentException(
+ "Predicate field type "
+ + predicate.type()
+ + " does not match schema field type "
+ + field.getType()
+ + " for field "
+ + field.getName()
+ + ".");
+ }
+ return field;
+ }
+
+ private static ResolvedField resolveField(RowType rowType, int fieldId) {
+ List<DataField> fields = rowType.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ DataField field = fields.get(i);
+ if (field.getFieldId() == fieldId) {
+ return new ResolvedField(i, field);
+ }
+ }
+ throw new IllegalArgumentException(
+ "Cannot resolve field id " + fieldId + " from row type.");
+ }
+
+ //
-------------------------------------------------------------------------
+ // Proto int32 <-> domain object mapping enums
+ //
-------------------------------------------------------------------------
+
+ /** Maps PbPredicate.type int32 values to predicate kinds. */
+ private enum PredicateType {
+ LEAF(0),
+ COMPOUND(1);
+
+ private final int value;
+ private static final PredicateType[] VALUES = new PredicateType[2];
+
+ static {
+ for (PredicateType t : values()) {
+ VALUES[t.value] = t;
+ }
+ }
+
+ PredicateType(int value) {
+ this.value = value;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ static PredicateType fromValue(int value) {
+ if (value < 0 || value >= VALUES.length) {
+ throw new IllegalArgumentException("Unknown predicate type: "
+ value);
+ }
+ return VALUES[value];
+ }
+ }
+
+ /** Maps PbLeafPredicate.function int32 values to {@link LeafFunction}
instances. */
+ private enum LeafFunctionCode {
+ EQUAL(0, Equal.INSTANCE),
+ NOT_EQUAL(1, NotEqual.INSTANCE),
+ LESS_THAN(2, LessThan.INSTANCE),
+ LESS_OR_EQUAL(3, LessOrEqual.INSTANCE),
+ GREATER_THAN(4, GreaterThan.INSTANCE),
+ GREATER_OR_EQUAL(5, GreaterOrEqual.INSTANCE),
+ IS_NULL(6, IsNull.INSTANCE),
+ IS_NOT_NULL(7, IsNotNull.INSTANCE),
+ STARTS_WITH(8, StartsWith.INSTANCE),
+ CONTAINS(9, Contains.INSTANCE),
+ END_WITH(10, EndsWith.INSTANCE),
+ IN(11, In.INSTANCE),
+ NOT_IN(12, NotIn.INSTANCE);
+
+ private final int value;
+ private final LeafFunction function;
+ private static final LeafFunctionCode[] VALUES = new
LeafFunctionCode[13];
+ private static final Map<Class<? extends LeafFunction>,
LeafFunctionCode> FUNCTION_MAP =
+ new HashMap<>();
+
+ static {
+ for (LeafFunctionCode c : values()) {
+ VALUES[c.value] = c;
+ FUNCTION_MAP.put(c.function.getClass(), c);
+ }
+ }
+
+ LeafFunctionCode(int value, LeafFunction function) {
+ this.value = value;
+ this.function = function;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ LeafFunction getFunction() {
+ return function;
+ }
+
+ static LeafFunctionCode fromValue(int value) {
+ if (value < 0 || value >= VALUES.length) {
+ throw new IllegalArgumentException("Unknown leaf function: " +
value);
+ }
+ return VALUES[value];
+ }
+
+ static LeafFunctionCode fromFunction(LeafFunction function) {
+ LeafFunctionCode c = FUNCTION_MAP.get(function.getClass());
+ if (c == null) {
+ throw new IllegalArgumentException("Unknown leaf function: " +
function);
+ }
+ return c;
+ }
+ }
+
+ /** Maps PbCompoundPredicate.function int32 values to {@link
CompoundPredicate.Function}. */
+ private enum CompoundFunctionCode {
+ AND(0, And.INSTANCE),
+ OR(1, Or.INSTANCE);
+
+ private final int value;
+ private final CompoundPredicate.Function function;
+ private static final CompoundFunctionCode[] VALUES = new
CompoundFunctionCode[2];
+ private static final Map<Class<? extends CompoundPredicate.Function>,
CompoundFunctionCode>
+ FUNCTION_MAP = new HashMap<>();
+
+ static {
+ for (CompoundFunctionCode c : values()) {
+ VALUES[c.value] = c;
+ FUNCTION_MAP.put(c.function.getClass(), c);
+ }
+ }
+
+ CompoundFunctionCode(int value, CompoundPredicate.Function function) {
+ this.value = value;
+ this.function = function;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ CompoundPredicate.Function getFunction() {
+ return function;
+ }
+
+ static CompoundFunctionCode fromValue(int value) {
+ if (value < 0 || value >= VALUES.length) {
+ throw new IllegalArgumentException("Unknown compound function:
" + value);
+ }
+ return VALUES[value];
+ }
+
+ static CompoundFunctionCode fromFunction(CompoundPredicate.Function
function) {
+ CompoundFunctionCode c = FUNCTION_MAP.get(function.getClass());
+ if (c == null) {
+ throw new IllegalArgumentException("Unknown compound function:
" + function);
+ }
+ return c;
+ }
+ }
+
+ /**
+ * Maps PbLiteralValue.literal_type int32 values to {@link DataTypeRoot}.
+ *
+ * <p>Note: proto uses INT/VARCHAR while the domain model uses
INTEGER/STRING.
+ */
+ private enum DataTypeRootCode {
+ BOOLEAN(0, DataTypeRoot.BOOLEAN),
+ TINYINT(1, DataTypeRoot.TINYINT),
+ SMALLINT(2, DataTypeRoot.SMALLINT),
+ INT(3, DataTypeRoot.INTEGER),
+ BIGINT(4, DataTypeRoot.BIGINT),
+ FLOAT(5, DataTypeRoot.FLOAT),
+ DOUBLE(6, DataTypeRoot.DOUBLE),
+ CHAR(7, DataTypeRoot.CHAR),
+ VARCHAR(8, DataTypeRoot.STRING),
+ DECIMAL(9, DataTypeRoot.DECIMAL),
+ DATE(10, DataTypeRoot.DATE),
+ TIME_WITHOUT_TIME_ZONE(11, DataTypeRoot.TIME_WITHOUT_TIME_ZONE),
+ TIMESTAMP_WITHOUT_TIME_ZONE(12,
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
+ TIMESTAMP_WITH_LOCAL_TIME_ZONE(13,
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
+ BINARY(14, DataTypeRoot.BINARY),
+ BYTES(15, DataTypeRoot.BYTES);
+
+ private final int value;
+ private final DataTypeRoot dataTypeRoot;
+ private static final DataTypeRootCode[] VALUES = new
DataTypeRootCode[16];
+ private static final Map<DataTypeRoot, DataTypeRootCode> ROOT_MAP =
new HashMap<>();
+
+ static {
+ for (DataTypeRootCode c : values()) {
+ VALUES[c.value] = c;
+ ROOT_MAP.put(c.dataTypeRoot, c);
+ }
+ }
+
+ DataTypeRootCode(int value, DataTypeRoot dataTypeRoot) {
+ this.value = value;
+ this.dataTypeRoot = dataTypeRoot;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ DataTypeRoot getDataTypeRoot() {
+ return dataTypeRoot;
+ }
+
+ static DataTypeRootCode fromValue(int value) {
+ if (value < 0 || value >= VALUES.length) {
+ throw new IllegalArgumentException("Unknown data type root: "
+ value);
+ }
+ return VALUES[value];
+ }
+
+ static DataTypeRootCode fromDataTypeRoot(DataTypeRoot root) {
+ DataTypeRootCode c = ROOT_MAP.get(root);
+ if (c == null) {
+ throw new IllegalArgumentException("Unknown data type root: "
+ root);
+ }
+ return c;
+ }
+ }
+
+ private static final class ResolvedField {
+ private final int index;
+ private final DataField field;
+
+ private ResolvedField(int index, DataField field) {
+ this.index = index;
+ this.field = field;
+ }
+ }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index b14238861..20b699fb1 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -1240,4 +1240,70 @@ message PbTableStatsRespForBucket {
// Per-column statistics, keyed by column index.
// Only populated when target_columns is specified in the request.
// repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+ // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+ required int32 type = 1;
+ optional PbLeafPredicate leaf = 2;
+ optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+ // The function to apply (see PbLeafFunction constants: EQUAL=0 ...
NOT_IN=12)
+ required int32 function = 1;
+ // The schema field id of the referenced top-level field
+ required int32 field_id = 2;
+ // The literals to compare with
+ repeated PbLiteralValue literals = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+ // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+ required int32 function = 1;
+ // The child predicates
+ repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3
compatibility)
+// AND=0, OR=1
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+ // See PbDataTypeRoot constants: indicates which value field is populated
+ required int32 literal_type = 1;
+ required bool is_null = 2;
+ optional bool boolean_value = 3;
+ optional int32 int_value = 4;
+ optional int64 bigint_value = 5;
+ optional float float_value = 6;
+ optional double double_value = 7;
+ optional string string_value = 8;
+ optional bytes binary_value = 9;
+ optional int64 decimal_value = 10; // Serialized decimal (compact mode)
+ optional int64 timestamp_millis_value = 11; // Epoch millis
+ optional int32 timestamp_nano_of_millis_value = 12; // Nano of millis
+ optional bytes decimal_bytes = 13; // Serialized decimal (non-compact mode)
}
\ No newline at end of file
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
new file mode 100644
index 000000000..8e1e66f76
--- /dev/null
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.util;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.Equal;
+import org.apache.fluss.predicate.GreaterThan;
+import org.apache.fluss.predicate.In;
+import org.apache.fluss.predicate.IsNull;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.LessThan;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.rpc.messages.PbPredicate;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** PredicateMessageUtilsTest. */
+public class PredicateMessageUtilsTest {
+
+ /**
+ * Builds a RowType that covers all field indices used by the given
predicates. Positions not
+ * covered by any predicate are filled with a nullable IntType
placeholder. Each field gets a
+ * stable, non-ordinal field id so tests validate field-id based
serialization.
+ */
+ private static RowType buildRowType(List<LeafPredicate> predicates) {
+ int maxIndex = 0;
+ for (LeafPredicate p : predicates) {
+ if (p.index() > maxIndex) {
+ maxIndex = p.index();
+ }
+ }
+ DataField[] fields = new DataField[maxIndex + 1];
+ for (int i = 0; i <= maxIndex; i++) {
+ fields[i] = new DataField("_placeholder_" + i, new IntType(true),
100 + i * 10);
+ }
+ for (LeafPredicate p : predicates) {
+ fields[p.index()] = new DataField(p.fieldName(), p.type(), 100 +
p.index() * 10);
+ }
+ return new RowType(Arrays.asList(fields));
+ }
+
+ private static RowType buildRowType(LeafPredicate... predicates) {
+ return buildRowType(Arrays.asList(predicates));
+ }
+
+ @Test
+ public void testLeafPredicateIntEqual() {
+ DataType type = new IntType(false);
+ LeafPredicate predicate =
+ new LeafPredicate(Equal.INSTANCE, type, 0, "id",
Collections.singletonList(123));
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+
assertThat(pb.getLeaf().getFieldId()).isEqualTo(rowType.getFields().get(0).getFieldId());
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("id");
+ assertThat(lp.literals().get(0)).isEqualTo(123);
+ }
+
+ @Test
+ public void testLeafPredicateStringIn() {
+ DataType type = new StringType(true);
+ List<Object> values =
+ Arrays.asList(
+ BinaryString.fromString("foo"),
+ BinaryString.fromString("bar"),
+ BinaryString.fromString("baz"));
+ LeafPredicate predicate = new LeafPredicate(In.INSTANCE, type, 1,
"name", values);
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(In.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("name");
+ assertThat(lp.literals()).isEqualTo(values);
+ }
+
+ @Test
+ public void testLeafPredicateIsNull() {
+ DataType type = new IntType(true);
+ LeafPredicate predicate =
+ new LeafPredicate(IsNull.INSTANCE, type, 2, "age",
Collections.emptyList());
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(IsNull.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("age");
+ }
+
+ @Test
+ public void testLeafPredicateDecimal() {
+ DataType type = new DecimalType(false, 10, 2);
+ Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("1234.56"),
10, 2);
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE, type, 3, "amount",
Collections.singletonList(decimal));
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("amount");
+ assertThat(lp.literals()).hasSize(1);
+ assertThat(((Decimal) lp.literals().get(0)).toBigDecimal())
+ .isEqualByComparingTo(decimal.toBigDecimal());
+ assertThat(((Decimal)
lp.literals().get(0)).precision()).isEqualTo(decimal.precision());
+ assertThat(((Decimal)
lp.literals().get(0)).scale()).isEqualTo(decimal.scale());
+ }
+
+ @Test
+ public void testLeafPredicateTimestamp() {
+ DataType type = new TimestampType(false, 3);
+ TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);
+ LeafPredicate predicate =
+ new LeafPredicate(
+ GreaterThan.INSTANCE, type, 4, "ts",
Collections.singletonList(ts));
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(GreaterThan.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("ts");
+ assertThat(lp.literals()).hasSize(1);
+ assertThat(((TimestampNtz) lp.literals().get(0)).getMillisecond())
+ .isEqualTo(ts.getMillisecond());
+ assertThat(((TimestampNtz)
lp.literals().get(0)).getNanoOfMillisecond())
+ .isEqualTo(ts.getNanoOfMillisecond());
+ }
+
+ @Test
+ public void testLeafPredicateBoolean() {
+ DataType type = new BooleanType(false);
+ LeafPredicate predicate =
+ new LeafPredicate(Equal.INSTANCE, type, 5, "flag",
Collections.singletonList(true));
+ RowType rowType = buildRowType(predicate);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+ assertThat(lp.fieldName()).isEqualTo("flag");
+ assertThat(lp.literals().get(0)).isEqualTo(true);
+ }
+
+ @Test
+ public void testCompoundPredicateAnd() {
+ DataType type = new IntType(false);
+ LeafPredicate p1 =
+ new LeafPredicate(
+ GreaterThan.INSTANCE, type, 0, "id",
Collections.singletonList(10));
+ LeafPredicate p2 =
+ new LeafPredicate(LessThan.INSTANCE, type, 0, "id",
Collections.singletonList(100));
+ CompoundPredicate andPredicate = new CompoundPredicate(And.INSTANCE,
Arrays.asList(p1, p2));
+ RowType rowType = buildRowType(p1, p2);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(andPredicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(CompoundPredicate.class);
+ CompoundPredicate cp = (CompoundPredicate) result;
+ assertThat(cp.function()).isEqualTo(And.INSTANCE);
+ assertThat(cp.children()).hasSize(2);
+ }
+
+ @Test
+ public void testCompoundPredicateOrNested() {
+ DataType type = new StringType(false);
+ LeafPredicate p1 =
+ new LeafPredicate(
+ Equal.INSTANCE, type, 0, "name",
Collections.singletonList("foo"));
+ LeafPredicate p2 =
+ new LeafPredicate(
+ Equal.INSTANCE, type, 0, "name",
Collections.singletonList("bar"));
+ CompoundPredicate orPredicate = new CompoundPredicate(Or.INSTANCE,
Arrays.asList(p1, p2));
+ CompoundPredicate andPredicate =
+ new CompoundPredicate(And.INSTANCE, Arrays.asList(orPredicate,
p1));
+ RowType rowType = buildRowType(p1, p2);
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(andPredicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(CompoundPredicate.class);
+ CompoundPredicate cp = (CompoundPredicate) result;
+ assertThat(cp.function()).isEqualTo(And.INSTANCE);
+ assertThat(cp.children()).hasSize(2);
+ assertThat(cp.children().get(0)).isInstanceOf(CompoundPredicate.class);
+ CompoundPredicate orCp = (CompoundPredicate) cp.children().get(0);
+ assertThat(orCp.function()).isEqualTo(Or.INSTANCE);
+ }
+
+ @Test
+ public void testPbLiteralSerde() {
+ // boolean
+ DataType boolType = new BooleanType(false);
+ LeafPredicate boolPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE, boolType, 0, "f_bool",
Collections.singletonList(true));
+ // int
+ DataType intType = new IntType(false);
+ LeafPredicate intPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE, intType, 1, "f_int",
Collections.singletonList(123));
+ // bigint
+ DataType bigIntType = new BigIntType(false);
+ LeafPredicate bigIntPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ bigIntType,
+ 2,
+ "f_bigint",
+ Collections.singletonList(1234567890123L));
+ // float
+ DataType floatType = new FloatType(false);
+ LeafPredicate floatPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE, floatType, 3, "f_float",
Collections.singletonList(1.23f));
+ // double
+ DataType doubleType = new DoubleType(false);
+ LeafPredicate doublePredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ doubleType,
+ 4,
+ "f_double",
+ Collections.singletonList(2.34d));
+ // char
+ DataType charType = new CharType(false, 5);
+ LeafPredicate charPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ charType,
+ 5,
+ "f_char",
+
Collections.singletonList(BinaryString.fromString("abcde")));
+ // string
+ DataType stringType = new StringType(false);
+ LeafPredicate stringPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ stringType,
+ 6,
+ "f_string",
+
Collections.singletonList(BinaryString.fromString("hello")));
+ // decimal
+ DataType decimalType = new DecimalType(false, 10, 2);
+ Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("1234.56"),
10, 2);
+ LeafPredicate decimalPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ decimalType,
+ 7,
+ "f_decimal",
+ Collections.singletonList(decimal));
+ // date
+ DataType dateType = new DateType(false);
+ LeafPredicate datePredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ dateType,
+ 8,
+ "f_date",
+ Collections.singletonList(
+ LocalDate.ofEpochDay(19000L))); // days since
epoch
+ // time
+ DataType timeType = new TimeType(false, 3);
+ LocalTime time = LocalTime.of(12, 30);
+ LeafPredicate timePredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ timeType,
+ 9,
+ "f_time",
+ Collections.singletonList(time)); // millis of day
+ // timestamp
+ DataType tsType = new TimestampType(false, 3);
+ TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);
+ LeafPredicate tsPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE, tsType, 10, "f_ts",
Collections.singletonList(ts));
+ // timestamp_ltz
+ DataType ltzType = new LocalZonedTimestampType(false, 3);
+ TimestampLtz ltz = TimestampLtz.fromEpochMillis(1680000000000L, 3);
+ LeafPredicate ltzPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE, ltzType, 11, "f_ltz",
Collections.singletonList(ltz));
+ // binary
+ DataType binaryType = new BinaryType(4);
+ LeafPredicate binaryPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ binaryType,
+ 12,
+ "f_binary",
+ Collections.singletonList(new byte[] {1, 2, 3, 4}));
+ // bytes
+ DataType bytesType = new BytesType(false);
+ LeafPredicate bytesPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ bytesType,
+ 13,
+ "f_bytes",
+ Collections.singletonList(new byte[] {5, 6, 7, 8}));
+ // tinyint
+ DataType tinyIntType = new TinyIntType(false);
+ LeafPredicate tinyIntPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ tinyIntType,
+ 14,
+ "f_tinyint",
+ Collections.singletonList((byte) 7));
+ // smallint
+ DataType smallIntType = new SmallIntType(false);
+ LeafPredicate smallIntPredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ smallIntType,
+ 15,
+ "f_smallint",
+ Collections.singletonList((short) 1234));
+
+ List<LeafPredicate> predicates =
+ Arrays.asList(
+ boolPredicate,
+ intPredicate,
+ bigIntPredicate,
+ floatPredicate,
+ doublePredicate,
+ charPredicate,
+ stringPredicate,
+ decimalPredicate,
+ datePredicate,
+ timePredicate,
+ tsPredicate,
+ ltzPredicate,
+ binaryPredicate,
+ bytesPredicate,
+ tinyIntPredicate,
+ smallIntPredicate);
+ RowType rowType = buildRowType(predicates);
+ for (LeafPredicate predicate : predicates) {
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ assertThat(pb.getLeaf().getFieldId())
+
.isEqualTo(rowType.getFields().get(predicate.index()).getFieldId());
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(predicate.function());
+ assertThat(lp.fieldName()).isEqualTo(predicate.fieldName());
+ assertThat(lp.index()).isEqualTo(predicate.index());
+
assertThat(lp.type().getClass()).isEqualTo(predicate.type().getClass());
+ if (predicate.type() instanceof DecimalType) {
+ assertThat(((Decimal) lp.literals().get(0)).precision())
+ .isEqualTo(((Decimal)
predicate.literals().get(0)).precision());
+ assertThat(((Decimal) lp.literals().get(0)).scale())
+ .isEqualTo(((Decimal)
predicate.literals().get(0)).scale());
+ } else if (predicate.type() instanceof TimestampType) {
+ assertThat(((TimestampNtz)
lp.literals().get(0)).getMillisecond())
+ .isEqualTo(((TimestampNtz)
predicate.literals().get(0)).getMillisecond());
+ } else if (predicate.type() instanceof LocalZonedTimestampType) {
+ assertThat(((TimestampLtz)
lp.literals().get(0)).getEpochMillisecond())
+ .isEqualTo(
+ ((TimestampLtz)
predicate.literals().get(0)).getEpochMillisecond());
+ } else if (predicate.type() instanceof BinaryType
+ || predicate.type() instanceof BytesType) {
+ assertThat((byte[]) lp.literals().get(0))
+ .isEqualTo((byte[]) predicate.literals().get(0));
+ } else {
+
assertThat(lp.literals().get(0)).isEqualTo(predicate.literals().get(0));
+ }
+ }
+ }
+
+ @Test
+ public void testAllDataTypesWithNullValues() {
+ List<LeafPredicate> nullPredicates =
+ Arrays.asList(
+ // Boolean null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new BooleanType(true),
+ 0,
+ "f_bool_null",
+ Collections.singletonList(null)),
+ // Int null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new IntType(true),
+ 1,
+ "f_int_null",
+ Collections.singletonList(null)),
+ // BigInt null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new BigIntType(true),
+ 2,
+ "f_bigint_null",
+ Collections.singletonList(null)),
+ // Float null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new FloatType(true),
+ 3,
+ "f_float_null",
+ Collections.singletonList(null)),
+ // Double null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new DoubleType(true),
+ 4,
+ "f_double_null",
+ Collections.singletonList(null)),
+ // Char null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new CharType(true, 5),
+ 5,
+ "f_char_null",
+ Collections.singletonList(null)),
+ // String null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new StringType(true),
+ 6,
+ "f_string_null",
+ Collections.singletonList(null)),
+ // Decimal null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new DecimalType(true, 10, 2),
+ 7,
+ "f_decimal_null",
+ Collections.singletonList(null)),
+ // Date null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new DateType(true),
+ 8,
+ "f_date_null",
+ Collections.singletonList(null)),
+ // Time null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new TimeType(true, 3),
+ 9,
+ "f_time_null",
+ Collections.singletonList(null)),
+ // Timestamp null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new TimestampType(true, 3),
+ 10,
+ "f_ts_null",
+ Collections.singletonList(null)),
+ // TimestampLtz null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new LocalZonedTimestampType(true, 3),
+ 11,
+ "f_ltz_null",
+ Collections.singletonList(null)),
+ // Binary null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new BinaryType(4),
+ 12,
+ "f_binary_null",
+ Collections.singletonList(null)),
+ // Bytes null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new BytesType(true),
+ 13,
+ "f_bytes_null",
+ Collections.singletonList(null)),
+ // TinyInt null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new TinyIntType(true),
+ 14,
+ "f_tinyint_null",
+ Collections.singletonList(null)),
+ // SmallInt null
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new SmallIntType(true),
+ 15,
+ "f_smallint_null",
+ Collections.singletonList(null)));
+
+ RowType rowType = buildRowType(nullPredicates);
+ for (LeafPredicate predicate : nullPredicates) {
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+ assertThat(pb.totalSize()).isGreaterThan(0);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate lp = (LeafPredicate) result;
+ assertThat(lp.function()).isEqualTo(predicate.function());
+ assertThat(lp.fieldName()).isEqualTo(predicate.fieldName());
+ assertThat(lp.index()).isEqualTo(predicate.index());
+
assertThat(lp.type().getClass()).isEqualTo(predicate.type().getClass());
+ assertThat(lp.literals().get(0)).isNull();
+ }
+ }
+
+ @Test
+ public void testSerializeUsesSchemaFieldIdInsteadOfIndex() {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("payload", new StringType(true),
6),
+ new DataField("ts", new TimestampType(false,
3), 9)));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ rowType.getTypeAt(1),
+ 1,
+ "payload",
+
Collections.singletonList(BinaryString.fromString("foo")));
+
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
rowType);
+
+ assertThat(pb.getLeaf().getFieldId()).isEqualTo(6);
+ Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+ LeafPredicate leafPredicate = (LeafPredicate) result;
+ assertThat(leafPredicate.index()).isEqualTo(1);
+ assertThat(leafPredicate.fieldName()).isEqualTo("payload");
+ }
+
+ @Test
+ public void testDeserializeResolvesByFieldIdAcrossSchemaEvolution() {
+ RowType originalRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("name", new StringType(true), 6),
+ new DataField("score", new BigIntType(true),
9)));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ originalRowType.getTypeAt(1),
+ 1,
+ "name",
+
Collections.singletonList(BinaryString.fromString("Alice")));
+
+ PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate,
originalRowType);
+
+ RowType evolvedRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("age", new IntType(true), 3),
+ new DataField("full_name", new
StringType(true), 6),
+ new DataField("score", new BigIntType(true),
9)));
+
+ Predicate result = PredicateMessageUtils.toPredicate(pb,
evolvedRowType);
+
+ assertThat(result).isInstanceOf(LeafPredicate.class);
+ LeafPredicate leafPredicate = (LeafPredicate) result;
+ assertThat(leafPredicate.index()).isEqualTo(2);
+ assertThat(leafPredicate.fieldName()).isEqualTo("full_name");
+
assertThat(leafPredicate.type()).isEqualTo(evolvedRowType.getTypeAt(2));
+
assertThat(leafPredicate.literals()).containsExactly(BinaryString.fromString("Alice"));
+ }
+
+ @Test
+ public void testRoundTripAcrossSchemaEvolutionUsesFieldIdAsStableAnchor() {
+ RowType sourceRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("name", new StringType(true), 6),
+ new DataField("score", new BigIntType(true),
9)));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ sourceRowType.getTypeAt(1),
+ 1,
+ "name",
+
Collections.singletonList(BinaryString.fromString("Bob")));
+
+ PbPredicate pbPredicate =
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+ RowType targetRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("score", new BigIntType(true),
9),
+ new DataField("age", new IntType(true), 3),
+ new DataField("customer_name", new
StringType(true), 6),
+ new DataField("id", new IntType(false), 0)));
+
+ LeafPredicate restored =
+ (LeafPredicate) PredicateMessageUtils.toPredicate(pbPredicate,
targetRowType);
+
+ assertThat(pbPredicate.getLeaf().getFieldId()).isEqualTo(6);
+ assertThat(restored.index()).isEqualTo(2);
+ assertThat(restored.fieldName()).isEqualTo("customer_name");
+ assertThat(restored.type()).isEqualTo(targetRowType.getTypeAt(2));
+
assertThat(restored.literals()).containsExactly(BinaryString.fromString("Bob"));
+ }
+
+ @Test
+ public void
testDeserializeRejectsIncompatibleEvolvedFieldTypeForSameFieldId() {
+ RowType sourceRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("name", new StringType(true),
6)));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ sourceRowType.getTypeAt(1),
+ 1,
+ "name",
+
Collections.singletonList(BinaryString.fromString("Alice")));
+
+ PbPredicate pbPredicate =
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+ RowType targetRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("name_as_bigint", new
BigIntType(true), 6)));
+
+ assertThatThrownBy(() ->
PredicateMessageUtils.toPredicate(pbPredicate, targetRowType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not match target schema field
type");
+ }
+
+ @Test
+ public void testDeserializeRejectsMissingFieldIdInTargetSchema() {
+ RowType sourceRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("name", new StringType(true),
6)));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ sourceRowType.getTypeAt(1),
+ 1,
+ "name",
+
Collections.singletonList(BinaryString.fromString("Alice")));
+
+ PbPredicate pbPredicate =
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+ RowType targetRowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("age", new IntType(true), 3)));
+
+ assertThatThrownBy(() ->
PredicateMessageUtils.toPredicate(pbPredicate, targetRowType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot resolve field id 6");
+ }
+
+ @Test
+ public void testSerializeRequiresSchemaFieldId() {
+ RowType rowType =
+ new RowType(Collections.singletonList(new DataField("id", new
IntType(false))));
+ LeafPredicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE, new IntType(false), 0, "id",
Collections.singletonList(1));
+
+ assertThatThrownBy(() ->
PredicateMessageUtils.toPbPredicate(predicate, rowType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not have a valid field id");
+ }
+
+ @Test
+ public void testSerializeRejectsSchemaMismatch() {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField("id", new IntType(false), 0),
+ new DataField("payload", new StringType(true),
6)));
+ LeafPredicate wrongNamePredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ rowType.getTypeAt(1),
+ 1,
+ "wrong_payload",
+
Collections.singletonList(BinaryString.fromString("foo")));
+ LeafPredicate wrongTypePredicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ new IntType(true),
+ 1,
+ "payload",
+ Collections.singletonList(1));
+
+ assertThatThrownBy(() ->
PredicateMessageUtils.toPbPredicate(wrongNamePredicate, rowType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not match schema field");
+ assertThatThrownBy(() ->
PredicateMessageUtils.toPbPredicate(wrongTypePredicate, rowType))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not match schema field type");
+ }
+
+ @Test
+ public void testTimestampLiteralFieldNumbersFollowReviewerLayout() throws
Exception {
+
assertThat(getPbLiteralValueFieldNumber("_TIMESTAMP_MILLIS_VALUE_FIELD_NUMBER"))
+ .isEqualTo(11);
+
assertThat(getPbLiteralValueFieldNumber("_TIMESTAMP_NANO_OF_MILLIS_VALUE_FIELD_NUMBER"))
+ .isEqualTo(12);
+
assertThat(getPbLiteralValueFieldNumber("_DECIMAL_BYTES_FIELD_NUMBER")).isEqualTo(13);
+ }
+
+ private static int getPbLiteralValueFieldNumber(String fieldName) throws
Exception {
+ Field field =
+
org.apache.fluss.rpc.messages.PbLiteralValue.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.getInt(null);
+ }
+}