This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ac21483dd [common] Support Protobuf and RPC for Predicates (#2502)
ac21483dd is described below

commit ac21483dd7b9f498ea839e459c952b8bf3a2076a
Author: Giannis Polyzos <[email protected]>
AuthorDate: Mon Mar 16 07:56:04 2026 +0100

    [common] Support Protobuf and RPC for Predicates (#2502)
---
 .../fluss/rpc/util/PredicateMessageUtils.java      | 572 +++++++++++++++
 fluss-rpc/src/main/proto/FlussApi.proto            |  66 ++
 .../fluss/rpc/util/PredicateMessageUtilsTest.java  | 781 +++++++++++++++++++++
 3 files changed, 1419 insertions(+)

diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java
new file mode 100644
index 000000000..3eb138e6f
--- /dev/null
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.util;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.Contains;
+import org.apache.fluss.predicate.EndsWith;
+import org.apache.fluss.predicate.Equal;
+import org.apache.fluss.predicate.GreaterOrEqual;
+import org.apache.fluss.predicate.GreaterThan;
+import org.apache.fluss.predicate.In;
+import org.apache.fluss.predicate.IsNotNull;
+import org.apache.fluss.predicate.IsNull;
+import org.apache.fluss.predicate.LeafFunction;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.LessOrEqual;
+import org.apache.fluss.predicate.LessThan;
+import org.apache.fluss.predicate.NotEqual;
+import org.apache.fluss.predicate.NotIn;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+import org.apache.fluss.predicate.StartsWith;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.rpc.messages.PbCompoundPredicate;
+import org.apache.fluss.rpc.messages.PbLeafPredicate;
+import org.apache.fluss.rpc.messages.PbLiteralValue;
+import org.apache.fluss.rpc.messages.PbPredicate;
+import org.apache.fluss.types.DataField;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.RowType;
+
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utils for converting Predicate to PbPredicate and vice versa. */
+public class PredicateMessageUtils {
+
+    // 
-------------------------------------------------------------------------
+    //  Deserialization: PbPredicate -> Predicate
+    // 
-------------------------------------------------------------------------
+
+    public static Predicate toPredicate(PbPredicate pbPredicate, RowType 
rowType) {
+        PredicateType type = PredicateType.fromValue(pbPredicate.getType());
+        switch (type) {
+            case LEAF:
+                return toLeafPredicate(pbPredicate.getLeaf(), rowType);
+            case COMPOUND:
+                return toCompoundPredicate(pbPredicate.getCompound(), rowType);
+            default:
+                throw new IllegalArgumentException("Unknown predicate type: " 
+ type);
+        }
+    }
+
+    public static CompoundPredicate toCompoundPredicate(
+            PbCompoundPredicate pbCompound, RowType rowType) {
+        List<Predicate> children =
+                pbCompound.getChildrensList().stream()
+                        .map(child -> toPredicate(child, rowType))
+                        .collect(Collectors.toList());
+        return new CompoundPredicate(
+                
CompoundFunctionCode.fromValue(pbCompound.getFunction()).getFunction(), 
children);
+    }
+
+    private static LeafPredicate toLeafPredicate(PbLeafPredicate pbLeaf, 
RowType rowType) {
+        ResolvedField resolvedField = resolveField(rowType, 
pbLeaf.getFieldId());
+        DataType fieldType = resolvedField.field.getType();
+        String fieldName = resolvedField.field.getName();
+        List<Object> literals =
+                pbLeaf.getLiteralsList().stream()
+                        .map(lit -> toLiteralValue(lit, fieldType))
+                        .collect(Collectors.toList());
+
+        return new LeafPredicate(
+                LeafFunctionCode.fromValue(pbLeaf.getFunction()).getFunction(),
+                fieldType,
+                resolvedField.index,
+                fieldName,
+                literals);
+    }
+
+    private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType 
fieldType) {
+        if (pbLiteral.isIsNull()) {
+            return null;
+        }
+        DataTypeRoot root =
+                
DataTypeRootCode.fromValue(pbLiteral.getLiteralType()).getDataTypeRoot();
+        validateLiteralType(fieldType, root);
+        switch (root) {
+            case BOOLEAN:
+                return pbLiteral.isBooleanValue();
+            case TINYINT:
+                return (byte) pbLiteral.getIntValue();
+            case SMALLINT:
+                return (short) pbLiteral.getIntValue();
+            case INTEGER:
+                return pbLiteral.getIntValue();
+            case BIGINT:
+                return pbLiteral.getBigintValue();
+            case FLOAT:
+                return pbLiteral.getFloatValue();
+            case DOUBLE:
+                return pbLiteral.getDoubleValue();
+            case CHAR:
+            case STRING:
+                String stringValue = pbLiteral.getStringValue();
+                return stringValue == null ? null : 
BinaryString.fromString(stringValue);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) fieldType;
+                if (pbLiteral.hasDecimalBytes()) {
+                    return Decimal.fromUnscaledBytes(
+                            pbLiteral.getDecimalBytes(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale());
+                } else {
+                    return Decimal.fromUnscaledLong(
+                            pbLiteral.getDecimalValue(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale());
+                }
+            case DATE:
+                return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
+            case TIME_WITHOUT_TIME_ZONE:
+                return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 
1_000_000L);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TimestampNtz.fromMillis(
+                        pbLiteral.getTimestampMillisValue(),
+                        pbLiteral.getTimestampNanoOfMillisValue());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampLtz.fromEpochMillis(
+                        pbLiteral.getTimestampMillisValue(),
+                        pbLiteral.getTimestampNanoOfMillisValue());
+            case BINARY:
+            case BYTES:
+                return pbLiteral.getBinaryValue();
+            default:
+                throw new IllegalArgumentException("Unknown literal value 
type: " + root);
+        }
+    }
+
+    private static void validateLiteralType(DataType fieldType, DataTypeRoot 
literalRoot) {
+        DataTypeRoot fieldRoot = fieldType.getTypeRoot();
+        if (fieldRoot == literalRoot) {
+            return;
+        }
+        if (isCharacterString(fieldRoot) && isCharacterString(literalRoot)) {
+            return;
+        }
+        if (isBinaryString(fieldRoot) && isBinaryString(literalRoot)) {
+            return;
+        }
+        throw new IllegalArgumentException(
+                "Literal type "
+                        + literalRoot
+                        + " does not match target schema field type "
+                        + fieldRoot
+                        + ".");
+    }
+
+    private static boolean isCharacterString(DataTypeRoot root) {
+        return root == DataTypeRoot.CHAR || root == DataTypeRoot.STRING;
+    }
+
+    private static boolean isBinaryString(DataTypeRoot root) {
+        return root == DataTypeRoot.BINARY || root == DataTypeRoot.BYTES;
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Serialization: Predicate -> PbPredicate
+    // 
-------------------------------------------------------------------------
+
+    public static PbPredicate toPbPredicate(Predicate predicate, RowType 
rowType) {
+        return predicate.visit(
+                new PredicateVisitor<PbPredicate>() {
+                    @Override
+                    public PbPredicate visit(LeafPredicate predicate) {
+                        DataField field = resolveSourceField(predicate, 
rowType);
+                        if (field.getFieldId() < 0) {
+                            throw new IllegalArgumentException(
+                                    "Field "
+                                            + field.getName()
+                                            + " at index "
+                                            + predicate.index()
+                                            + " does not have a valid field 
id.");
+                        }
+                        PbLeafPredicate pbLeaf = new PbLeafPredicate();
+                        pbLeaf.setFunction(
+                                
LeafFunctionCode.fromFunction(predicate.function()).getValue());
+                        pbLeaf.setFieldId(field.getFieldId());
+
+                        List<PbLiteralValue> literals = new ArrayList<>();
+                        for (Object literal : predicate.literals()) {
+                            literals.add(toPbLiteralValue(field.getType(), 
literal));
+                        }
+                        pbLeaf.addAllLiterals(literals);
+
+                        PbPredicate pbPredicate = new PbPredicate();
+                        pbPredicate.setType(PredicateType.LEAF.getValue());
+                        pbPredicate.setLeaf(pbLeaf);
+                        return pbPredicate;
+                    }
+
+                    @Override
+                    public PbPredicate visit(CompoundPredicate predicate) {
+                        PbCompoundPredicate pbCompound = new 
PbCompoundPredicate();
+                        pbCompound.setFunction(
+                                
CompoundFunctionCode.fromFunction(predicate.function()).getValue());
+                        pbCompound.addAllChildrens(
+                                predicate.children().stream()
+                                        .map(child -> toPbPredicate(child, 
rowType))
+                                        .collect(Collectors.toList()));
+
+                        PbPredicate pbPredicate = new PbPredicate();
+                        pbPredicate.setType(PredicateType.COMPOUND.getValue());
+                        pbPredicate.setCompound(pbCompound);
+                        return pbPredicate;
+                    }
+                });
+    }
+
+    private static PbLiteralValue toPbLiteralValue(DataType type, Object 
literal) {
+        PbLiteralValue pbLiteral = new PbLiteralValue();
+        
pbLiteral.setLiteralType(DataTypeRootCode.fromDataTypeRoot(type.getTypeRoot()).getValue());
+        if (literal == null) {
+            pbLiteral.setIsNull(true);
+            return pbLiteral;
+        }
+        pbLiteral.setIsNull(false);
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case STRING:
+                pbLiteral.setStringValue(literal.toString());
+                break;
+            case BOOLEAN:
+                pbLiteral.setBooleanValue((Boolean) literal);
+                break;
+            case BINARY:
+            case BYTES:
+                pbLiteral.setBinaryValue((byte[]) literal);
+                break;
+            case DECIMAL:
+                Decimal decimal = (Decimal) literal;
+                if (decimal.isCompact()) {
+                    pbLiteral.setDecimalValue(decimal.toUnscaledLong());
+                } else {
+                    pbLiteral.setDecimalBytes(decimal.toUnscaledBytes());
+                }
+                break;
+            case TINYINT:
+                pbLiteral.setIntValue((Byte) literal);
+                break;
+            case SMALLINT:
+                pbLiteral.setIntValue((Short) literal);
+                break;
+            case INTEGER:
+                pbLiteral.setIntValue((Integer) literal);
+                break;
+            case DATE:
+                pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                pbLiteral.setIntValue((int) (((LocalTime) 
literal).toNanoOfDay() / 1_000_000L));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                pbLiteral.setTimestampMillisValue(((TimestampNtz) 
literal).getMillisecond());
+                pbLiteral.setTimestampNanoOfMillisValue(
+                        ((TimestampNtz) literal).getNanoOfMillisecond());
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                pbLiteral.setTimestampMillisValue(((TimestampLtz) 
literal).getEpochMillisecond());
+                pbLiteral.setTimestampNanoOfMillisValue(
+                        ((TimestampLtz) literal).getNanoOfMillisecond());
+                break;
+            case BIGINT:
+                pbLiteral.setBigintValue((Long) literal);
+                break;
+            case FLOAT:
+                pbLiteral.setFloatValue((Float) literal);
+                break;
+            case DOUBLE:
+                pbLiteral.setDoubleValue((Double) literal);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown data type: " + 
type.getTypeRoot());
+        }
+        return pbLiteral;
+    }
+
+    private static DataField resolveSourceField(LeafPredicate predicate, 
RowType rowType) {
+        if (predicate.index() < 0 || predicate.index() >= 
rowType.getFieldCount()) {
+            throw new IllegalArgumentException(
+                    "Predicate field index "
+                            + predicate.index()
+                            + " is out of bounds for row type with "
+                            + rowType.getFieldCount()
+                            + " fields.");
+        }
+        DataField field = rowType.getFields().get(predicate.index());
+        if (!field.getName().equals(predicate.fieldName())) {
+            throw new IllegalArgumentException(
+                    "Predicate field name "
+                            + predicate.fieldName()
+                            + " does not match schema field "
+                            + field.getName()
+                            + " at index "
+                            + predicate.index()
+                            + ".");
+        }
+        if (!field.getType().equals(predicate.type())) {
+            throw new IllegalArgumentException(
+                    "Predicate field type "
+                            + predicate.type()
+                            + " does not match schema field type "
+                            + field.getType()
+                            + " for field "
+                            + field.getName()
+                            + ".");
+        }
+        return field;
+    }
+
+    private static ResolvedField resolveField(RowType rowType, int fieldId) {
+        List<DataField> fields = rowType.getFields();
+        for (int i = 0; i < fields.size(); i++) {
+            DataField field = fields.get(i);
+            if (field.getFieldId() == fieldId) {
+                return new ResolvedField(i, field);
+            }
+        }
+        throw new IllegalArgumentException(
+                "Cannot resolve field id " + fieldId + " from row type.");
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Proto int32 <-> domain object mapping enums
+    // 
-------------------------------------------------------------------------
+
+    /** Maps PbPredicate.type int32 values to predicate kinds. */
+    private enum PredicateType {
+        LEAF(0),
+        COMPOUND(1);
+
+        private final int value;
+        private static final PredicateType[] VALUES = new PredicateType[2];
+
+        static {
+            for (PredicateType t : values()) {
+                VALUES[t.value] = t;
+            }
+        }
+
+        PredicateType(int value) {
+            this.value = value;
+        }
+
+        int getValue() {
+            return value;
+        }
+
+        static PredicateType fromValue(int value) {
+            if (value < 0 || value >= VALUES.length) {
+                throw new IllegalArgumentException("Unknown predicate type: " 
+ value);
+            }
+            return VALUES[value];
+        }
+    }
+
+    /** Maps PbLeafPredicate.function int32 values to {@link LeafFunction} 
instances. */
+    private enum LeafFunctionCode {
+        EQUAL(0, Equal.INSTANCE),
+        NOT_EQUAL(1, NotEqual.INSTANCE),
+        LESS_THAN(2, LessThan.INSTANCE),
+        LESS_OR_EQUAL(3, LessOrEqual.INSTANCE),
+        GREATER_THAN(4, GreaterThan.INSTANCE),
+        GREATER_OR_EQUAL(5, GreaterOrEqual.INSTANCE),
+        IS_NULL(6, IsNull.INSTANCE),
+        IS_NOT_NULL(7, IsNotNull.INSTANCE),
+        STARTS_WITH(8, StartsWith.INSTANCE),
+        CONTAINS(9, Contains.INSTANCE),
+        END_WITH(10, EndsWith.INSTANCE),
+        IN(11, In.INSTANCE),
+        NOT_IN(12, NotIn.INSTANCE);
+
+        private final int value;
+        private final LeafFunction function;
+        private static final LeafFunctionCode[] VALUES = new 
LeafFunctionCode[13];
+        private static final Map<Class<? extends LeafFunction>, 
LeafFunctionCode> FUNCTION_MAP =
+                new HashMap<>();
+
+        static {
+            for (LeafFunctionCode c : values()) {
+                VALUES[c.value] = c;
+                FUNCTION_MAP.put(c.function.getClass(), c);
+            }
+        }
+
+        LeafFunctionCode(int value, LeafFunction function) {
+            this.value = value;
+            this.function = function;
+        }
+
+        int getValue() {
+            return value;
+        }
+
+        LeafFunction getFunction() {
+            return function;
+        }
+
+        static LeafFunctionCode fromValue(int value) {
+            if (value < 0 || value >= VALUES.length) {
+                throw new IllegalArgumentException("Unknown leaf function: " + 
value);
+            }
+            return VALUES[value];
+        }
+
+        static LeafFunctionCode fromFunction(LeafFunction function) {
+            LeafFunctionCode c = FUNCTION_MAP.get(function.getClass());
+            if (c == null) {
+                throw new IllegalArgumentException("Unknown leaf function: " + 
function);
+            }
+            return c;
+        }
+    }
+
+    /** Maps PbCompoundPredicate.function int32 values to {@link 
CompoundPredicate.Function}. */
+    private enum CompoundFunctionCode {
+        AND(0, And.INSTANCE),
+        OR(1, Or.INSTANCE);
+
+        private final int value;
+        private final CompoundPredicate.Function function;
+        private static final CompoundFunctionCode[] VALUES = new 
CompoundFunctionCode[2];
+        private static final Map<Class<? extends CompoundPredicate.Function>, 
CompoundFunctionCode>
+                FUNCTION_MAP = new HashMap<>();
+
+        static {
+            for (CompoundFunctionCode c : values()) {
+                VALUES[c.value] = c;
+                FUNCTION_MAP.put(c.function.getClass(), c);
+            }
+        }
+
+        CompoundFunctionCode(int value, CompoundPredicate.Function function) {
+            this.value = value;
+            this.function = function;
+        }
+
+        int getValue() {
+            return value;
+        }
+
+        CompoundPredicate.Function getFunction() {
+            return function;
+        }
+
+        static CompoundFunctionCode fromValue(int value) {
+            if (value < 0 || value >= VALUES.length) {
+                throw new IllegalArgumentException("Unknown compound function: 
" + value);
+            }
+            return VALUES[value];
+        }
+
+        static CompoundFunctionCode fromFunction(CompoundPredicate.Function 
function) {
+            CompoundFunctionCode c = FUNCTION_MAP.get(function.getClass());
+            if (c == null) {
+                throw new IllegalArgumentException("Unknown compound function: 
" + function);
+            }
+            return c;
+        }
+    }
+
+    /**
+     * Maps PbLiteralValue.literal_type int32 values to {@link DataTypeRoot}.
+     *
+     * <p>Note: proto uses INT/VARCHAR while the domain model uses 
INTEGER/STRING.
+     */
+    private enum DataTypeRootCode {
+        BOOLEAN(0, DataTypeRoot.BOOLEAN),
+        TINYINT(1, DataTypeRoot.TINYINT),
+        SMALLINT(2, DataTypeRoot.SMALLINT),
+        INT(3, DataTypeRoot.INTEGER),
+        BIGINT(4, DataTypeRoot.BIGINT),
+        FLOAT(5, DataTypeRoot.FLOAT),
+        DOUBLE(6, DataTypeRoot.DOUBLE),
+        CHAR(7, DataTypeRoot.CHAR),
+        VARCHAR(8, DataTypeRoot.STRING),
+        DECIMAL(9, DataTypeRoot.DECIMAL),
+        DATE(10, DataTypeRoot.DATE),
+        TIME_WITHOUT_TIME_ZONE(11, DataTypeRoot.TIME_WITHOUT_TIME_ZONE),
+        TIMESTAMP_WITHOUT_TIME_ZONE(12, 
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
+        TIMESTAMP_WITH_LOCAL_TIME_ZONE(13, 
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
+        BINARY(14, DataTypeRoot.BINARY),
+        BYTES(15, DataTypeRoot.BYTES);
+
+        private final int value;
+        private final DataTypeRoot dataTypeRoot;
+        private static final DataTypeRootCode[] VALUES = new 
DataTypeRootCode[16];
+        private static final Map<DataTypeRoot, DataTypeRootCode> ROOT_MAP = 
new HashMap<>();
+
+        static {
+            for (DataTypeRootCode c : values()) {
+                VALUES[c.value] = c;
+                ROOT_MAP.put(c.dataTypeRoot, c);
+            }
+        }
+
+        DataTypeRootCode(int value, DataTypeRoot dataTypeRoot) {
+            this.value = value;
+            this.dataTypeRoot = dataTypeRoot;
+        }
+
+        int getValue() {
+            return value;
+        }
+
+        DataTypeRoot getDataTypeRoot() {
+            return dataTypeRoot;
+        }
+
+        static DataTypeRootCode fromValue(int value) {
+            if (value < 0 || value >= VALUES.length) {
+                throw new IllegalArgumentException("Unknown data type root: " 
+ value);
+            }
+            return VALUES[value];
+        }
+
+        static DataTypeRootCode fromDataTypeRoot(DataTypeRoot root) {
+            DataTypeRootCode c = ROOT_MAP.get(root);
+            if (c == null) {
+                throw new IllegalArgumentException("Unknown data type root: " 
+ root);
+            }
+            return c;
+        }
+    }
+
+    private static final class ResolvedField {
+        private final int index;
+        private final DataField field;
+
+        private ResolvedField(int index, DataField field) {
+            this.index = index;
+            this.field = field;
+        }
+    }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index b14238861..20b699fb1 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -1240,4 +1240,70 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The schema field id of the referenced top-level field
+  required int32 field_id = 2;
+  // The literals to compare with
+  repeated PbLiteralValue literals = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+  // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+  required int32 function = 1;
+  // The child predicates
+  repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3 
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3 
compatibility)
+// AND=0, OR=1
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3 
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+  // See PbDataTypeRoot constants: indicates which value field is populated
+  required int32 literal_type = 1;
+  required bool is_null = 2;
+  optional bool boolean_value = 3;
+  optional int32 int_value = 4;
+  optional int64 bigint_value = 5;
+  optional float float_value = 6;
+  optional double double_value = 7;
+  optional string string_value = 8;
+  optional bytes binary_value = 9;
+  optional int64 decimal_value = 10;   // Serialized decimal (compact mode)
+  optional int64 timestamp_millis_value = 11;      // Epoch millis
+  optional int32 timestamp_nano_of_millis_value = 12; // Nano of millis
+  optional bytes decimal_bytes = 13;   // Serialized decimal (non-compact mode)
 }
\ No newline at end of file
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
new file mode 100644
index 000000000..8e1e66f76
--- /dev/null
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.util;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.Equal;
+import org.apache.fluss.predicate.GreaterThan;
+import org.apache.fluss.predicate.In;
+import org.apache.fluss.predicate.IsNull;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.LessThan;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.rpc.messages.PbPredicate;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataField;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** PredicateMessageUtilsTest. */
+public class PredicateMessageUtilsTest {
+
+    /**
+     * Builds a RowType that covers all field indices used by the given 
predicates. Positions not
+     * covered by any predicate are filled with a nullable IntType 
placeholder. Each field gets a
+     * stable, non-ordinal field id so tests validate field-id based 
serialization.
+     */
+    private static RowType buildRowType(List<LeafPredicate> predicates) {
+        int maxIndex = 0;
+        for (LeafPredicate p : predicates) {
+            if (p.index() > maxIndex) {
+                maxIndex = p.index();
+            }
+        }
+        DataField[] fields = new DataField[maxIndex + 1];
+        for (int i = 0; i <= maxIndex; i++) {
+            fields[i] = new DataField("_placeholder_" + i, new IntType(true), 
100 + i * 10);
+        }
+        for (LeafPredicate p : predicates) {
+            fields[p.index()] = new DataField(p.fieldName(), p.type(), 100 + 
p.index() * 10);
+        }
+        return new RowType(Arrays.asList(fields));
+    }
+
+    private static RowType buildRowType(LeafPredicate... predicates) {
+        return buildRowType(Arrays.asList(predicates));
+    }
+
+    @Test
+    public void testLeafPredicateIntEqual() {
+        DataType type = new IntType(false);
+        LeafPredicate predicate =
+                new LeafPredicate(Equal.INSTANCE, type, 0, "id", 
Collections.singletonList(123));
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        
assertThat(pb.getLeaf().getFieldId()).isEqualTo(rowType.getFields().get(0).getFieldId());
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("id");
+        assertThat(lp.literals().get(0)).isEqualTo(123);
+    }
+
+    @Test
+    public void testLeafPredicateStringIn() {
+        DataType type = new StringType(true);
+        List<Object> values =
+                Arrays.asList(
+                        BinaryString.fromString("foo"),
+                        BinaryString.fromString("bar"),
+                        BinaryString.fromString("baz"));
+        LeafPredicate predicate = new LeafPredicate(In.INSTANCE, type, 1, 
"name", values);
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(In.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("name");
+        assertThat(lp.literals()).isEqualTo(values);
+    }
+
+    @Test
+    public void testLeafPredicateIsNull() {
+        DataType type = new IntType(true);
+        LeafPredicate predicate =
+                new LeafPredicate(IsNull.INSTANCE, type, 2, "age", 
Collections.emptyList());
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(IsNull.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("age");
+    }
+
+    @Test
+    public void testLeafPredicateDecimal() {
+        DataType type = new DecimalType(false, 10, 2);
+        Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("1234.56"), 
10, 2);
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, type, 3, "amount", 
Collections.singletonList(decimal));
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("amount");
+        assertThat(lp.literals()).hasSize(1);
+        assertThat(((Decimal) lp.literals().get(0)).toBigDecimal())
+                .isEqualByComparingTo(decimal.toBigDecimal());
+        assertThat(((Decimal) 
lp.literals().get(0)).precision()).isEqualTo(decimal.precision());
+        assertThat(((Decimal) 
lp.literals().get(0)).scale()).isEqualTo(decimal.scale());
+    }
+
+    @Test
+    public void testLeafPredicateTimestamp() {
+        DataType type = new TimestampType(false, 3);
+        TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        GreaterThan.INSTANCE, type, 4, "ts", 
Collections.singletonList(ts));
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(GreaterThan.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("ts");
+        assertThat(lp.literals()).hasSize(1);
+        assertThat(((TimestampNtz) lp.literals().get(0)).getMillisecond())
+                .isEqualTo(ts.getMillisecond());
+        assertThat(((TimestampNtz) 
lp.literals().get(0)).getNanoOfMillisecond())
+                .isEqualTo(ts.getNanoOfMillisecond());
+    }
+
+    @Test
+    public void testLeafPredicateBoolean() {
+        DataType type = new BooleanType(false);
+        LeafPredicate predicate =
+                new LeafPredicate(Equal.INSTANCE, type, 5, "flag", 
Collections.singletonList(true));
+        RowType rowType = buildRowType(predicate);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate lp = (LeafPredicate) result;
+        assertThat(lp.function()).isEqualTo(Equal.INSTANCE);
+        assertThat(lp.fieldName()).isEqualTo("flag");
+        assertThat(lp.literals().get(0)).isEqualTo(true);
+    }
+
+    @Test
+    public void testCompoundPredicateAnd() {
+        DataType type = new IntType(false);
+        LeafPredicate p1 =
+                new LeafPredicate(
+                        GreaterThan.INSTANCE, type, 0, "id", 
Collections.singletonList(10));
+        LeafPredicate p2 =
+                new LeafPredicate(LessThan.INSTANCE, type, 0, "id", 
Collections.singletonList(100));
+        CompoundPredicate andPredicate = new CompoundPredicate(And.INSTANCE, 
Arrays.asList(p1, p2));
+        RowType rowType = buildRowType(p1, p2);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(andPredicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(CompoundPredicate.class);
+        CompoundPredicate cp = (CompoundPredicate) result;
+        assertThat(cp.function()).isEqualTo(And.INSTANCE);
+        assertThat(cp.children()).hasSize(2);
+    }
+
+    @Test
+    public void testCompoundPredicateOrNested() {
+        DataType type = new StringType(false);
+        LeafPredicate p1 =
+                new LeafPredicate(
+                        Equal.INSTANCE, type, 0, "name", 
Collections.singletonList("foo"));
+        LeafPredicate p2 =
+                new LeafPredicate(
+                        Equal.INSTANCE, type, 0, "name", 
Collections.singletonList("bar"));
+        CompoundPredicate orPredicate = new CompoundPredicate(Or.INSTANCE, 
Arrays.asList(p1, p2));
+        CompoundPredicate andPredicate =
+                new CompoundPredicate(And.INSTANCE, Arrays.asList(orPredicate, 
p1));
+        RowType rowType = buildRowType(p1, p2);
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(andPredicate, 
rowType);
+        assertThat(pb.totalSize()).isGreaterThan(0);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        assertThat(result).isInstanceOf(CompoundPredicate.class);
+        CompoundPredicate cp = (CompoundPredicate) result;
+        assertThat(cp.function()).isEqualTo(And.INSTANCE);
+        assertThat(cp.children()).hasSize(2);
+        assertThat(cp.children().get(0)).isInstanceOf(CompoundPredicate.class);
+        CompoundPredicate orCp = (CompoundPredicate) cp.children().get(0);
+        assertThat(orCp.function()).isEqualTo(Or.INSTANCE);
+    }
+
+    @Test
+    public void testPbLiteralSerde() {
+        // boolean
+        DataType boolType = new BooleanType(false);
+        LeafPredicate boolPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, boolType, 0, "f_bool", 
Collections.singletonList(true));
+        // int
+        DataType intType = new IntType(false);
+        LeafPredicate intPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, intType, 1, "f_int", 
Collections.singletonList(123));
+        // bigint
+        DataType bigIntType = new BigIntType(false);
+        LeafPredicate bigIntPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        bigIntType,
+                        2,
+                        "f_bigint",
+                        Collections.singletonList(1234567890123L));
+        // float
+        DataType floatType = new FloatType(false);
+        LeafPredicate floatPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, floatType, 3, "f_float", 
Collections.singletonList(1.23f));
+        // double
+        DataType doubleType = new DoubleType(false);
+        LeafPredicate doublePredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        doubleType,
+                        4,
+                        "f_double",
+                        Collections.singletonList(2.34d));
+        // char
+        DataType charType = new CharType(false, 5);
+        LeafPredicate charPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        charType,
+                        5,
+                        "f_char",
+                        
Collections.singletonList(BinaryString.fromString("abcde")));
+        // string
+        DataType stringType = new StringType(false);
+        LeafPredicate stringPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        stringType,
+                        6,
+                        "f_string",
+                        
Collections.singletonList(BinaryString.fromString("hello")));
+        // decimal
+        DataType decimalType = new DecimalType(false, 10, 2);
+        Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("1234.56"), 
10, 2);
+        LeafPredicate decimalPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        decimalType,
+                        7,
+                        "f_decimal",
+                        Collections.singletonList(decimal));
+        // date
+        DataType dateType = new DateType(false);
+        LeafPredicate datePredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        dateType,
+                        8,
+                        "f_date",
+                        Collections.singletonList(
+                                LocalDate.ofEpochDay(19000L))); // days since 
epoch
+        // time
+        DataType timeType = new TimeType(false, 3);
+        LocalTime time = LocalTime.of(12, 30);
+        LeafPredicate timePredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        timeType,
+                        9,
+                        "f_time",
+                        Collections.singletonList(time)); // millis of day
+        // timestamp
+        DataType tsType = new TimestampType(false, 3);
+        TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);
+        LeafPredicate tsPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, tsType, 10, "f_ts", 
Collections.singletonList(ts));
+        // timestamp_ltz
+        DataType ltzType = new LocalZonedTimestampType(false, 3);
+        TimestampLtz ltz = TimestampLtz.fromEpochMillis(1680000000000L, 3);
+        LeafPredicate ltzPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, ltzType, 11, "f_ltz", 
Collections.singletonList(ltz));
+        // binary
+        DataType binaryType = new BinaryType(4);
+        LeafPredicate binaryPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        binaryType,
+                        12,
+                        "f_binary",
+                        Collections.singletonList(new byte[] {1, 2, 3, 4}));
+        // bytes
+        DataType bytesType = new BytesType(false);
+        LeafPredicate bytesPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        bytesType,
+                        13,
+                        "f_bytes",
+                        Collections.singletonList(new byte[] {5, 6, 7, 8}));
+        // tinyint
+        DataType tinyIntType = new TinyIntType(false);
+        LeafPredicate tinyIntPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        tinyIntType,
+                        14,
+                        "f_tinyint",
+                        Collections.singletonList((byte) 7));
+        // smallint
+        DataType smallIntType = new SmallIntType(false);
+        LeafPredicate smallIntPredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        smallIntType,
+                        15,
+                        "f_smallint",
+                        Collections.singletonList((short) 1234));
+
+        List<LeafPredicate> predicates =
+                Arrays.asList(
+                        boolPredicate,
+                        intPredicate,
+                        bigIntPredicate,
+                        floatPredicate,
+                        doublePredicate,
+                        charPredicate,
+                        stringPredicate,
+                        decimalPredicate,
+                        datePredicate,
+                        timePredicate,
+                        tsPredicate,
+                        ltzPredicate,
+                        binaryPredicate,
+                        bytesPredicate,
+                        tinyIntPredicate,
+                        smallIntPredicate);
+        RowType rowType = buildRowType(predicates);
+        for (LeafPredicate predicate : predicates) {
+            PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+            assertThat(pb.totalSize()).isGreaterThan(0);
+            assertThat(pb.getLeaf().getFieldId())
+                    
.isEqualTo(rowType.getFields().get(predicate.index()).getFieldId());
+            Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+            assertThat(result).isInstanceOf(LeafPredicate.class);
+            LeafPredicate lp = (LeafPredicate) result;
+            assertThat(lp.function()).isEqualTo(predicate.function());
+            assertThat(lp.fieldName()).isEqualTo(predicate.fieldName());
+            assertThat(lp.index()).isEqualTo(predicate.index());
+            
assertThat(lp.type().getClass()).isEqualTo(predicate.type().getClass());
+            if (predicate.type() instanceof DecimalType) {
+                assertThat(((Decimal) lp.literals().get(0)).precision())
+                        .isEqualTo(((Decimal) 
predicate.literals().get(0)).precision());
+                assertThat(((Decimal) lp.literals().get(0)).scale())
+                        .isEqualTo(((Decimal) 
predicate.literals().get(0)).scale());
+            } else if (predicate.type() instanceof TimestampType) {
+                assertThat(((TimestampNtz) 
lp.literals().get(0)).getMillisecond())
+                        .isEqualTo(((TimestampNtz) 
predicate.literals().get(0)).getMillisecond());
+            } else if (predicate.type() instanceof LocalZonedTimestampType) {
+                assertThat(((TimestampLtz) 
lp.literals().get(0)).getEpochMillisecond())
+                        .isEqualTo(
+                                ((TimestampLtz) 
predicate.literals().get(0)).getEpochMillisecond());
+            } else if (predicate.type() instanceof BinaryType
+                    || predicate.type() instanceof BytesType) {
+                assertThat((byte[]) lp.literals().get(0))
+                        .isEqualTo((byte[]) predicate.literals().get(0));
+            } else {
+                
assertThat(lp.literals().get(0)).isEqualTo(predicate.literals().get(0));
+            }
+        }
+    }
+
+    @Test
+    public void testAllDataTypesWithNullValues() {
+        List<LeafPredicate> nullPredicates =
+                Arrays.asList(
+                        // Boolean null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new BooleanType(true),
+                                0,
+                                "f_bool_null",
+                                Collections.singletonList(null)),
+                        // Int null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new IntType(true),
+                                1,
+                                "f_int_null",
+                                Collections.singletonList(null)),
+                        // BigInt null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new BigIntType(true),
+                                2,
+                                "f_bigint_null",
+                                Collections.singletonList(null)),
+                        // Float null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new FloatType(true),
+                                3,
+                                "f_float_null",
+                                Collections.singletonList(null)),
+                        // Double null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new DoubleType(true),
+                                4,
+                                "f_double_null",
+                                Collections.singletonList(null)),
+                        // Char null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new CharType(true, 5),
+                                5,
+                                "f_char_null",
+                                Collections.singletonList(null)),
+                        // String null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new StringType(true),
+                                6,
+                                "f_string_null",
+                                Collections.singletonList(null)),
+                        // Decimal null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new DecimalType(true, 10, 2),
+                                7,
+                                "f_decimal_null",
+                                Collections.singletonList(null)),
+                        // Date null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new DateType(true),
+                                8,
+                                "f_date_null",
+                                Collections.singletonList(null)),
+                        // Time null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new TimeType(true, 3),
+                                9,
+                                "f_time_null",
+                                Collections.singletonList(null)),
+                        // Timestamp null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new TimestampType(true, 3),
+                                10,
+                                "f_ts_null",
+                                Collections.singletonList(null)),
+                        // TimestampLtz null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new LocalZonedTimestampType(true, 3),
+                                11,
+                                "f_ltz_null",
+                                Collections.singletonList(null)),
+                        // Binary null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new BinaryType(4),
+                                12,
+                                "f_binary_null",
+                                Collections.singletonList(null)),
+                        // Bytes null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new BytesType(true),
+                                13,
+                                "f_bytes_null",
+                                Collections.singletonList(null)),
+                        // TinyInt null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new TinyIntType(true),
+                                14,
+                                "f_tinyint_null",
+                                Collections.singletonList(null)),
+                        // SmallInt null
+                        new LeafPredicate(
+                                Equal.INSTANCE,
+                                new SmallIntType(true),
+                                15,
+                                "f_smallint_null",
+                                Collections.singletonList(null)));
+
+        RowType rowType = buildRowType(nullPredicates);
+        for (LeafPredicate predicate : nullPredicates) {
+            PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+            assertThat(pb.totalSize()).isGreaterThan(0);
+            Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+            assertThat(result).isInstanceOf(LeafPredicate.class);
+            LeafPredicate lp = (LeafPredicate) result;
+            assertThat(lp.function()).isEqualTo(predicate.function());
+            assertThat(lp.fieldName()).isEqualTo(predicate.fieldName());
+            assertThat(lp.index()).isEqualTo(predicate.index());
+            
assertThat(lp.type().getClass()).isEqualTo(predicate.type().getClass());
+            assertThat(lp.literals().get(0)).isNull();
+        }
+    }
+
+    @Test
+    public void testSerializeUsesSchemaFieldIdInsteadOfIndex() {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("payload", new StringType(true), 
6),
+                                new DataField("ts", new TimestampType(false, 
3), 9)));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        rowType.getTypeAt(1),
+                        1,
+                        "payload",
+                        
Collections.singletonList(BinaryString.fromString("foo")));
+
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
rowType);
+
+        assertThat(pb.getLeaf().getFieldId()).isEqualTo(6);
+        Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
+        LeafPredicate leafPredicate = (LeafPredicate) result;
+        assertThat(leafPredicate.index()).isEqualTo(1);
+        assertThat(leafPredicate.fieldName()).isEqualTo("payload");
+    }
+
+    @Test
+    public void testDeserializeResolvesByFieldIdAcrossSchemaEvolution() {
+        RowType originalRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("name", new StringType(true), 6),
+                                new DataField("score", new BigIntType(true), 
9)));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        originalRowType.getTypeAt(1),
+                        1,
+                        "name",
+                        
Collections.singletonList(BinaryString.fromString("Alice")));
+
+        PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, 
originalRowType);
+
+        RowType evolvedRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("age", new IntType(true), 3),
+                                new DataField("full_name", new 
StringType(true), 6),
+                                new DataField("score", new BigIntType(true), 
9)));
+
+        Predicate result = PredicateMessageUtils.toPredicate(pb, 
evolvedRowType);
+
+        assertThat(result).isInstanceOf(LeafPredicate.class);
+        LeafPredicate leafPredicate = (LeafPredicate) result;
+        assertThat(leafPredicate.index()).isEqualTo(2);
+        assertThat(leafPredicate.fieldName()).isEqualTo("full_name");
+        
assertThat(leafPredicate.type()).isEqualTo(evolvedRowType.getTypeAt(2));
+        
assertThat(leafPredicate.literals()).containsExactly(BinaryString.fromString("Alice"));
+    }
+
+    @Test
+    public void testRoundTripAcrossSchemaEvolutionUsesFieldIdAsStableAnchor() {
+        RowType sourceRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("name", new StringType(true), 6),
+                                new DataField("score", new BigIntType(true), 
9)));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        sourceRowType.getTypeAt(1),
+                        1,
+                        "name",
+                        
Collections.singletonList(BinaryString.fromString("Bob")));
+
+        PbPredicate pbPredicate = 
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+        RowType targetRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("score", new BigIntType(true), 
9),
+                                new DataField("age", new IntType(true), 3),
+                                new DataField("customer_name", new 
StringType(true), 6),
+                                new DataField("id", new IntType(false), 0)));
+
+        LeafPredicate restored =
+                (LeafPredicate) PredicateMessageUtils.toPredicate(pbPredicate, 
targetRowType);
+
+        assertThat(pbPredicate.getLeaf().getFieldId()).isEqualTo(6);
+        assertThat(restored.index()).isEqualTo(2);
+        assertThat(restored.fieldName()).isEqualTo("customer_name");
+        assertThat(restored.type()).isEqualTo(targetRowType.getTypeAt(2));
+        
assertThat(restored.literals()).containsExactly(BinaryString.fromString("Bob"));
+    }
+
+    @Test
+    public void 
testDeserializeRejectsIncompatibleEvolvedFieldTypeForSameFieldId() {
+        RowType sourceRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("name", new StringType(true), 
6)));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        sourceRowType.getTypeAt(1),
+                        1,
+                        "name",
+                        
Collections.singletonList(BinaryString.fromString("Alice")));
+
+        PbPredicate pbPredicate = 
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+        RowType targetRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("name_as_bigint", new 
BigIntType(true), 6)));
+
+        assertThatThrownBy(() -> 
PredicateMessageUtils.toPredicate(pbPredicate, targetRowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not match target schema field 
type");
+    }
+
+    @Test
+    public void testDeserializeRejectsMissingFieldIdInTargetSchema() {
+        RowType sourceRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("name", new StringType(true), 
6)));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        sourceRowType.getTypeAt(1),
+                        1,
+                        "name",
+                        
Collections.singletonList(BinaryString.fromString("Alice")));
+
+        PbPredicate pbPredicate = 
PredicateMessageUtils.toPbPredicate(predicate, sourceRowType);
+
+        RowType targetRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("age", new IntType(true), 3)));
+
+        assertThatThrownBy(() -> 
PredicateMessageUtils.toPredicate(pbPredicate, targetRowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Cannot resolve field id 6");
+    }
+
+    @Test
+    public void testSerializeRequiresSchemaFieldId() {
+        RowType rowType =
+                new RowType(Collections.singletonList(new DataField("id", new 
IntType(false))));
+        LeafPredicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE, new IntType(false), 0, "id", 
Collections.singletonList(1));
+
+        assertThatThrownBy(() -> 
PredicateMessageUtils.toPbPredicate(predicate, rowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not have a valid field id");
+    }
+
+    @Test
+    public void testSerializeRejectsSchemaMismatch() {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", new IntType(false), 0),
+                                new DataField("payload", new StringType(true), 
6)));
+        LeafPredicate wrongNamePredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        rowType.getTypeAt(1),
+                        1,
+                        "wrong_payload",
+                        
Collections.singletonList(BinaryString.fromString("foo")));
+        LeafPredicate wrongTypePredicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        new IntType(true),
+                        1,
+                        "payload",
+                        Collections.singletonList(1));
+
+        assertThatThrownBy(() -> 
PredicateMessageUtils.toPbPredicate(wrongNamePredicate, rowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not match schema field");
+        assertThatThrownBy(() -> 
PredicateMessageUtils.toPbPredicate(wrongTypePredicate, rowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not match schema field type");
+    }
+
+    @Test
+    public void testTimestampLiteralFieldNumbersFollowReviewerLayout() throws 
Exception {
+        
assertThat(getPbLiteralValueFieldNumber("_TIMESTAMP_MILLIS_VALUE_FIELD_NUMBER"))
+                .isEqualTo(11);
+        
assertThat(getPbLiteralValueFieldNumber("_TIMESTAMP_NANO_OF_MILLIS_VALUE_FIELD_NUMBER"))
+                .isEqualTo(12);
+        
assertThat(getPbLiteralValueFieldNumber("_DECIMAL_BYTES_FIELD_NUMBER")).isEqualTo(13);
+    }
+
+    private static int getPbLiteralValueFieldNumber(String fieldName) throws 
Exception {
+        Field field =
+                
org.apache.fluss.rpc.messages.PbLiteralValue.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.getInt(null);
+    }
+}

Reply via email to