This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit c843f966a3b15ded41a72c2baf16b94e02b1839b Author: Alibaba-HZY <[email protected]> AuthorDate: Mon Mar 3 11:36:23 2025 +0800 [common] Introduce Predicate interface and basic predicate expressions (#515) --- .../main/java/com/alibaba/fluss/predicate/And.java | 67 +++ .../com/alibaba/fluss/predicate/CompareUtils.java | 60 +++ .../alibaba/fluss/predicate/CompoundPredicate.java | 114 ++++++ .../java/com/alibaba/fluss/predicate/Contains.java | 62 +++ .../java/com/alibaba/fluss/predicate/EndsWith.java | 65 +++ .../java/com/alibaba/fluss/predicate/Equal.java | 58 +++ .../java/com/alibaba/fluss/predicate/FieldRef.java | 79 ++++ .../alibaba/fluss/predicate/FunctionVisitor.java | 83 ++++ .../alibaba/fluss/predicate/GreaterOrEqual.java | 58 +++ .../com/alibaba/fluss/predicate/GreaterThan.java | 58 +++ .../main/java/com/alibaba/fluss/predicate/In.java | 83 ++++ .../com/alibaba/fluss/predicate/IsNotNull.java | 55 +++ .../java/com/alibaba/fluss/predicate/IsNull.java | 55 +++ .../com/alibaba/fluss/predicate/LeafFunction.java | 65 +++ .../com/alibaba/fluss/predicate/LeafPredicate.java | 179 ++++++++ .../alibaba/fluss/predicate/LeafUnaryFunction.java | 53 +++ .../com/alibaba/fluss/predicate/LessOrEqual.java | 58 +++ .../java/com/alibaba/fluss/predicate/LessThan.java | 58 +++ .../java/com/alibaba/fluss/predicate/NotEqual.java | 58 +++ .../java/com/alibaba/fluss/predicate/NotIn.java | 82 ++++ .../predicate/NullFalseLeafBinaryFunction.java | 61 +++ .../main/java/com/alibaba/fluss/predicate/Or.java | 67 +++ .../fluss/predicate/PartitionPredicateVisitor.java | 51 +++ .../com/alibaba/fluss/predicate/Predicate.java | 50 +++ .../alibaba/fluss/predicate/PredicateBuilder.java | 454 +++++++++++++++++++++ .../alibaba/fluss/predicate/PredicateVisitor.java | 30 ++ .../com/alibaba/fluss/predicate/StartsWith.java | 69 ++++ .../fluss/predicate/UnsupportedExpression.java | 29 ++ .../main/java/com/alibaba/fluss/types/RowType.java | 10 + .../com/alibaba/fluss/utils/BinaryStringUtils.java | 8 + .../com/alibaba/fluss/utils/DateTimeUtils.java | 10 + .../fluss/predicate/PredicateBuilderTest.java | 85 ++++ .../com/alibaba/fluss/predicate/PredicateTest.java | 424 +++++++++++++++++++ fluss-test-coverage/pom.xml | 2 + 34 files changed, 2800 insertions(+) diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java new file mode 100644 index 000000000..204bb7e3d --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.InternalRow; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link CompoundPredicate.Function} to eval and. */ +public class And extends CompoundPredicate.Function { + + private static final long serialVersionUID = -2977938814804928712L; + + public static final And INSTANCE = new And(); + + private And() {} + + @Override + public boolean test(InternalRow row, List<Predicate> children) { + for (Predicate child : children) { + if (!child.test(row)) { + return false; + } + } + return true; + } + + @Override + public Optional<Predicate> negate(List<Predicate> children) { + List<Predicate> negatedChildren = new ArrayList<>(); + for (Predicate child : children) { + Optional<Predicate> negatedChild = child.negate(); + if (negatedChild.isPresent()) { + negatedChildren.add(negatedChild.get()); + } else { + return Optional.empty(); + } + } + return Optional.of(new CompoundPredicate(Or.INSTANCE, negatedChildren)); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, List<T> children) { + return visitor.visitAnd(children); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java new file mode 100644 index 000000000..50380abf8 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.types.DataType; + +import static java.lang.Math.min; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Utils for comparator. */ +public class CompareUtils { + private CompareUtils() {} + + public static int compareLiteral(DataType type, Object v1, Object v2) { + if (v1 instanceof Comparable) { + // because BinaryString can not serialize so v1 or v2 may be BinaryString convert to + // String for compare + if (v1 instanceof BinaryString) { + v1 = ((BinaryString) v1).toString(); + } + if (v2 instanceof BinaryString) { + v2 = ((BinaryString) v2).toString(); + } + return ((Comparable<Object>) v1).compareTo(v2); + } else if (v1 instanceof byte[]) { + return compare((byte[]) v1, (byte[]) v2); + } else { + throw new RuntimeException("Unsupported type: " + type); + } + } + + private static int compare(byte[] first, byte[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] - second[x]; + if (cmp != 0) { + return cmp; + } + } + return first.length - second.length; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java new file mode 100644 index 000000000..449598f2f --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.InternalRow; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Non-leaf node in a {@link Predicate} tree. Its evaluation result depends on the results of its + * children. + */ +public class CompoundPredicate implements Predicate { + + private final Function function; + private final List<Predicate> children; + + public CompoundPredicate(Function function, List<Predicate> children) { + this.function = function; + this.children = children; + } + + public Function function() { + return function; + } + + public List<Predicate> children() { + return children; + } + + @Override + public boolean test(InternalRow row) { + return function.test(row, children); + } + + @Override + public Optional<Predicate> negate() { + return function.negate(children); + } + + @Override + public <T> T visit(PredicateVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CompoundPredicate)) { + return false; + } + CompoundPredicate that = (CompoundPredicate) o; + return Objects.equals(function, that.function) && Objects.equals(children, that.children); + } + + @Override + public int hashCode() { + return Objects.hash(function, children); + } + + @Override + public String toString() { + return function + "(" + children + ")"; + } + + /** Evaluate the predicate result based on multiple {@link Predicate}s. */ + public abstract static class Function implements Serializable { + + public abstract boolean test(InternalRow row, List<Predicate> children); + + public abstract Optional<Predicate> negate(List<Predicate> children); + + public abstract <T> T visit(FunctionVisitor<T> visitor, List<T> children); + + @Override + public int hashCode() { + return this.getClass().getName().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java new file mode 100644 index 000000000..cfdaf0a06 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like '%abc%'}. */ +public class Contains extends NullFalseLeafBinaryFunction { + + public static final Contains INSTANCE = new Contains(); + + private Contains() {} + + @Override + public boolean test(DataType type, Object field, Object patternLiteral) { + String fieldString = field.toString(); + return fieldString.contains((String) patternLiteral); + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + Object patternLiteral) { + return true; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.empty(); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitContains(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java new file mode 100644 index 000000000..a6af4c2e1 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like '%abc' or filter like + * '_abc'}. + */ +public class EndsWith extends NullFalseLeafBinaryFunction { + + public static final EndsWith INSTANCE = new EndsWith(); + + private EndsWith() {} + + @Override + public boolean test(DataType type, Object field, Object patternLiteral) { + String fieldString = field.toString(); + return fieldString.endsWith((String) patternLiteral); + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + Object patternLiteral) { + return true; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.empty(); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitEndsWith(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java new file mode 100644 index 000000000..26b181749 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval equal. */ +public class Equal extends NullFalseLeafBinaryFunction { + + public static final Equal INSTANCE = new Equal(); + + private Equal() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) == 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, min) >= 0 && compareLiteral(type, literal, max) <= 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(NotEqual.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitEqual(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java new file mode 100644 index 000000000..d98dc92ba --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.io.Serializable; +import java.util.Objects; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A reference to a field in an input. */ +public class FieldRef implements Serializable { + + private static final long serialVersionUID = 4982103776651292199L; + + private final int index; + private final String name; + private final DataType type; + + public FieldRef(int index, String name, DataType type) { + this.index = index; + this.name = name; + this.type = type; + } + + public int index() { + return index; + } + + public String name() { + return name; + } + + public DataType type() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FieldRef fieldRef = (FieldRef) o; + return index == fieldRef.index + && Objects.equals(name, fieldRef.name) + && Objects.equals(type, fieldRef.type); + } + + @Override + public int hashCode() { + return Objects.hash(index, name, type); + } + + @Override + public String toString() { + return "FieldRef{" + "index=" + index + ", name='" + name + '\'' + ", type=" + type + '}'; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java new file mode 100644 index 000000000..806f2de37 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import java.util.List; +import java.util.stream.Collectors; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link PredicateVisitor} to visit functions. */ +public interface FunctionVisitor<T> extends PredicateVisitor<T> { + + @Override + default T visit(LeafPredicate predicate) { + return predicate.function().visit(this, predicate.fieldRef(), predicate.literals()); + } + + @Override + default T visit(CompoundPredicate predicate) { + return predicate + .function() + .visit( + this, + predicate.children().stream() + .map(p -> p.visit(this)) + .collect(Collectors.toList())); + } + + // ----------------- Unary functions ------------------------ + + T visitIsNotNull(FieldRef fieldRef); + + T visitIsNull(FieldRef fieldRef); + + // ----------------- Binary functions ------------------------ + + T visitStartsWith(FieldRef fieldRef, Object literal); + + T visitEndsWith(FieldRef fieldRef, Object literal); + + T visitContains(FieldRef fieldRef, Object literal); + + T visitLessThan(FieldRef fieldRef, Object literal); + + T visitGreaterOrEqual(FieldRef fieldRef, Object literal); + + T visitNotEqual(FieldRef fieldRef, Object literal); + + T visitLessOrEqual(FieldRef fieldRef, Object literal); + + T visitEqual(FieldRef fieldRef, Object literal); + + T visitGreaterThan(FieldRef fieldRef, Object literal); + + // ----------------- Other functions ------------------------ + + T visitIn(FieldRef fieldRef, List<Object> literals); + + T visitNotIn(FieldRef fieldRef, List<Object> literals); + + // ----------------- Compound functions ------------------------ + + T visitAnd(List<T> children); + + T visitOr(List<T> children); +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java new file mode 100644 index 000000000..0a915355f --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval greater or equal. */ +public class GreaterOrEqual extends NullFalseLeafBinaryFunction { + + public static final GreaterOrEqual INSTANCE = new GreaterOrEqual(); + + private GreaterOrEqual() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) <= 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, max) <= 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(LessThan.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitGreaterOrEqual(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java new file mode 100644 index 000000000..f92c84c9b --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link LeafFunction} to eval greater. */ +public class GreaterThan extends NullFalseLeafBinaryFunction { + + public static final GreaterThan INSTANCE = new GreaterThan(); + + private GreaterThan() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) < 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, max) < 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(LessOrEqual.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitGreaterThan(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java new file mode 100644 index 000000000..672a1c797 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link LeafFunction} to eval in. */ +public class In extends LeafFunction { + + private static final long serialVersionUID = -9115697441080586485L; + + public static final In INSTANCE = new In(); + + private In() {} + + @Override + public boolean test(DataType type, Object field, List<Object> literals) { + if (field == null) { + return false; + } + for (Object literal : literals) { + if (literal != null && compareLiteral(type, literal, field) == 0) { + return true; + } + } + return false; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List<Object> literals) { + if (nullCount != null && rowCount == nullCount) { + return false; + } + for (Object literal : literals) { + if (literal != null + && compareLiteral(type, literal, min) >= 0 + && compareLiteral(type, literal, max) <= 0) { + return true; + } + } + return false; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(NotIn.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitIn(fieldRef, literals); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java new file mode 100644 index 000000000..6249306b5 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval is not null. */ +public class IsNotNull extends LeafUnaryFunction { + + public static final IsNotNull INSTANCE = new IsNotNull(); + + private IsNotNull() {} + + @Override + public boolean test(DataType type, Object field) { + return field != null; + } + + @Override + public boolean test(DataType type, long rowCount, Object min, Object max, Long nullCount) { + return nullCount == null || nullCount < rowCount; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(IsNull.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitIsNotNull(fieldRef); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java new file mode 100644 index 000000000..d97c8cb43 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval is null. */ +public class IsNull extends LeafUnaryFunction { + + public static final IsNull INSTANCE = new IsNull(); + + private IsNull() {} + + @Override + public boolean test(DataType type, Object field) { + return field == null; + } + + @Override + public boolean test(DataType type, long rowCount, Object min, Object max, Long nullCount) { + return nullCount == null || nullCount > 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(IsNotNull.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitIsNull(fieldRef); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java new file mode 100644 index 000000000..a897c902e --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Function to test a field with literals. */ +public abstract class LeafFunction implements Serializable { + + public abstract boolean test(DataType type, Object field, List<Object> literals); + + public abstract boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List<Object> literals); + + public abstract Optional<LeafFunction> negate(); + + @Override + public int hashCode() { + return this.getClass().getName().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + public abstract <T> T visit( + FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals); + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java new file mode 100644 index 000000000..343e6c0f3 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.TimestampType; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Leaf node of a {@link Predicate} tree. Compares a field in the row with literals. */ +public class LeafPredicate implements Predicate { + + private static final long serialVersionUID = -9033842253303772188L; + + private final LeafFunction function; + private final DataType type; + private final int fieldIndex; + private final String fieldName; + + private List<Object> literals; + + public LeafPredicate( + LeafFunction function, + DataType type, + int fieldIndex, + String fieldName, + List<Object> literals) { + this.function = function; + this.type = type; + this.fieldIndex = fieldIndex; + this.fieldName = fieldName; + this.literals = literals; + } + + public LeafFunction function() { + return function; + } + + public DataType type() { + return type; + } + + public int index() { + return fieldIndex; + } + + public String fieldName() { + return fieldName; + } + + public FieldRef fieldRef() { + return new FieldRef(fieldIndex, fieldName, type); + } + + public List<Object> literals() { + return literals; + } + + public LeafPredicate copyWithNewIndex(int fieldIndex) { + return new LeafPredicate(function, type, fieldIndex, fieldName, literals); + } + + @Override + public boolean test(InternalRow row) { + return function.test(type, get(row, fieldIndex, type), literals); + } + + @Override + public Optional<Predicate> negate() { + return function.negate() + .map(negate -> new LeafPredicate(negate, type, fieldIndex, fieldName, literals)); + } + + @Override + public <T> T visit(PredicateVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LeafPredicate that = (LeafPredicate) o; + return fieldIndex == that.fieldIndex + && Objects.equals(fieldName, that.fieldName) + && Objects.equals(function, that.function) + && Objects.equals(type, that.type) + && Objects.equals(literals, that.literals); + } + + @Override + public int hashCode() { + return Objects.hash(function, type, fieldIndex, fieldName, literals); + } + + @Override + public String toString() { + String literalsStr; + if (literals == null || literals.isEmpty()) { + literalsStr = ""; + } else if (literals.size() == 1) { + literalsStr = Objects.toString(literals.get(0)); + } else { + literalsStr = literals.toString(); + } + return literalsStr.isEmpty() + ? function + "(" + fieldName + ")" + : function + "(" + fieldName + ", " + literalsStr + ")"; + } + + public static Object get(InternalRow internalRow, int pos, DataType fieldType) { + if (internalRow.isNullAt(pos)) { + return null; + } + switch (fieldType.getTypeRoot()) { + case BOOLEAN: + return internalRow.getBoolean(pos); + case TINYINT: + return internalRow.getByte(pos); + case SMALLINT: + return internalRow.getShort(pos); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return internalRow.getInt(pos); + case BIGINT: + return internalRow.getLong(pos); + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) fieldType; + return internalRow.getTimestampNtz(pos, timestampType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType lzTs = (LocalZonedTimestampType) fieldType; + return internalRow.getTimestampNtz(pos, lzTs.getPrecision()); + case FLOAT: + return internalRow.getFloat(pos); + case DOUBLE: + return internalRow.getDouble(pos); + case CHAR: + case STRING: + return internalRow.getString(pos); + case DECIMAL: + DecimalType decimalType = (DecimalType) fieldType; + return internalRow.getDecimal( + pos, decimalType.getPrecision(), decimalType.getScale()); + case BINARY: + return internalRow.getBytes(pos); + default: + throw new UnsupportedOperationException("Unsupported type: " + fieldType); + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java new file mode 100644 index 000000000..b10fe40ff --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Function to test a field. */ +public abstract class LeafUnaryFunction extends LeafFunction { + + private static final long serialVersionUID = -155104972966998013L; + + public abstract boolean test(DataType type, Object value); + + public abstract boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount); + + @Override + public boolean test(DataType type, Object value, List<Object> literals) { + return test(type, value); + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List<Object> literals) { + return test(type, rowCount, min, max, nullCount); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java new file mode 100644 index 000000000..45dd3fda6 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval less or equal. */ +public class LessOrEqual extends NullFalseLeafBinaryFunction { + + public static final LessOrEqual INSTANCE = new LessOrEqual(); + + private LessOrEqual() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) >= 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, min) >= 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(GreaterThan.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitLessOrEqual(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java new file mode 100644 index 000000000..ed1ee70c3 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval less or equal. */ +public class LessThan extends NullFalseLeafBinaryFunction { + + public static final LessThan INSTANCE = new LessThan(); + + private LessThan() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) > 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, min) > 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(GreaterOrEqual.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitLessThan(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java new file mode 100644 index 000000000..bca203a22 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link NullFalseLeafBinaryFunction} to eval not equal. */ +public class NotEqual extends NullFalseLeafBinaryFunction { + + public static final NotEqual INSTANCE = new NotEqual(); + + private NotEqual() {} + + @Override + public boolean test(DataType type, Object field, Object literal) { + return compareLiteral(type, literal, field) != 0; + } + + @Override + public boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal) { + return compareLiteral(type, literal, min) != 0 || compareLiteral(type, literal, max) != 0; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(Equal.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitNotEqual(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java new file mode 100644 index 000000000..3bad7e56e --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link LeafFunction} to eval not in. */ +public class NotIn extends LeafFunction { + + private static final long serialVersionUID = 8953845894700582887L; + public static final NotIn INSTANCE = new NotIn(); + + private NotIn() {} + + @Override + public boolean test(DataType type, Object field, List<Object> literals) { + if (field == null) { + return false; + } + for (Object literal : literals) { + if (literal == null || compareLiteral(type, literal, field) == 0) { + return false; + } + } + return true; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List<Object> literals) { + if (nullCount != null && rowCount == nullCount) { + return false; + } + for (Object literal : literals) { + if (literal == null + || (compareLiteral(type, literal, min) == 0 + && compareLiteral(type, literal, max) == 0)) { + return false; + } + } + return true; + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.of(In.INSTANCE); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitNotIn(fieldRef, literals); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java new file mode 100644 index 000000000..2dfdf22d4 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Function to test a field with a literal. */ +public abstract class NullFalseLeafBinaryFunction extends LeafFunction { + + private static final long serialVersionUID = 5617091663961558170L; + + public abstract boolean test(DataType type, Object field, Object literal); + + public abstract boolean test( + DataType type, long rowCount, Object min, Object max, Long nullCount, Object literal); + + @Override + public boolean test(DataType type, Object field, List<Object> literals) { + if (field == null || literals.get(0) == null) { + return false; + } + return test(type, field, literals.get(0)); + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List<Object> literals) { + if (nullCount != null) { + if (rowCount == nullCount || literals.get(0) == null) { + return false; + } + } + return test(type, rowCount, min, max, nullCount, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java new file mode 100644 index 000000000..744717435 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.InternalRow; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A {@link CompoundPredicate.Function} to eval or. */ +public class Or extends CompoundPredicate.Function { + + private static final long serialVersionUID = -2110346319473699418L; + + public static final Or INSTANCE = new Or(); + + private Or() {} + + @Override + public boolean test(InternalRow row, List<Predicate> children) { + for (Predicate child : children) { + if (child.test(row)) { + return true; + } + } + return false; + } + + @Override + public Optional<Predicate> negate(List<Predicate> children) { + List<Predicate> negatedChildren = new ArrayList<>(); + for (Predicate child : children) { + Optional<Predicate> negatedChild = child.negate(); + if (negatedChild.isPresent()) { + negatedChildren.add(negatedChild.get()); + } else { + return Optional.empty(); + } + } + return Optional.of(new CompoundPredicate(And.INSTANCE, negatedChildren)); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, List<T> children) { + return visitor.visitOr(children); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java new file mode 100644 index 000000000..6318aba9c --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import java.util.List; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Visit the predicate and check if it only contains partition key's predicate. */ +public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> { + + private final List<String> partitionKeys; + + public PartitionPredicateVisitor(List<String> partitionKeys) { + this.partitionKeys = partitionKeys; + } + + @Override + public Boolean visit(LeafPredicate predicate) { + return partitionKeys.contains(predicate.fieldName()); + } + + @Override + public Boolean visit(CompoundPredicate predicate) { + for (Predicate child : predicate.children()) { + Boolean matched = child.visit(this); + + if (!matched) { + return false; + } + } + return true; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java new file mode 100644 index 000000000..7083a33d5 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.InternalRow; + +import java.io.Serializable; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Predicate which returns Boolean and provides testing by stats. + * + * @see PredicateBuilder + * @since 0.4.0 + */ +public interface Predicate extends Serializable { + + /** + * Now only support test based on the specific input row. Todo: boolean test(long rowCount, + * InternalRow minValues, InternalRow maxValues, InternalArray nullCounts); Test based on the + * specific input row. + * + * @return return true when hit, false when not hit. + */ + boolean test(InternalRow row); + + /** @return the negation predicate of this predicate if possible. */ + Optional<Predicate> negate(); + + <T> T visit(PredicateVisitor<T> visitor); +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java new file mode 100644 index 000000000..975954c2a --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataField; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.utils.TypeUtils; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.utils.Preconditions.checkArgument; +import static java.util.Collections.singletonList; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * A utility class to create {@link Predicate} object for common filter conditions. + * + * @since 0.4.0 + */ +public class PredicateBuilder { + + private final RowType rowType; + private final List<String> fieldNames; + + public PredicateBuilder(RowType rowType) { + this.rowType = rowType; + this.fieldNames = rowType.getFieldNames(); + } + + public int indexOf(String field) { + return fieldNames.indexOf(field); + } + + public Predicate equal(int idx, Object literal) { + return leaf(Equal.INSTANCE, idx, literal); + } + + public Predicate notEqual(int idx, Object literal) { + return leaf(NotEqual.INSTANCE, idx, literal); + } + + public Predicate lessThan(int idx, Object literal) { + return leaf(LessThan.INSTANCE, idx, literal); + } + + public Predicate lessOrEqual(int idx, Object literal) { + return leaf(LessOrEqual.INSTANCE, idx, literal); + } + + public Predicate greaterThan(int idx, Object literal) { + return leaf(GreaterThan.INSTANCE, idx, literal); + } + + public Predicate greaterOrEqual(int idx, Object literal) { + return leaf(GreaterOrEqual.INSTANCE, idx, literal); + } + + public Predicate isNull(int idx) { + return leaf(IsNull.INSTANCE, idx); + } + + public Predicate isNotNull(int idx) { + return leaf(IsNotNull.INSTANCE, idx); + } + + public Predicate startsWith(int idx, Object patternLiteral) { + return leaf(StartsWith.INSTANCE, idx, patternLiteral); + } + + public Predicate endsWith(int idx, Object patternLiteral) { + return leaf(EndsWith.INSTANCE, idx, patternLiteral); + } + + public Predicate contains(int idx, Object patternLiteral) { + return leaf(Contains.INSTANCE, idx, patternLiteral); + } + + public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) { + validateIndex(idx); + DataField field = rowType.getFields().get(idx); + return new LeafPredicate( + function, field.getType(), idx, field.getName(), singletonList(literal)); + } + + public Predicate leaf(LeafUnaryFunction function, int idx) { + validateIndex(idx); + DataField field = rowType.getFields().get(idx); + return new LeafPredicate( + function, field.getType(), idx, field.getName(), Collections.emptyList()); + } + + public Predicate in(int idx, List<Object> literals) { + validateIndex(idx); + // In the IN predicate, 20 literals are critical for performance. + // If there are more than 20 literals, the performance will decrease. + if (literals.size() > 20) { + DataField field = rowType.getFields().get(idx); + return new LeafPredicate(In.INSTANCE, field.getType(), idx, field.getName(), literals); + } + + List<Predicate> equals = new ArrayList<>(literals.size()); + for (Object literal : literals) { + equals.add(equal(idx, literal)); + } + return or(equals); + } + + public Predicate notIn(int idx, List<Object> literals) { + return in(idx, literals).negate().get(); + } + + public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) { + return new CompoundPredicate( + And.INSTANCE, + Arrays.asList( + greaterOrEqual(idx, includedLowerBound), + lessOrEqual(idx, includedUpperBound))); + } + + public static Predicate and(Predicate... predicates) { + return and(Arrays.asList(predicates)); + } + + public static Predicate and(List<Predicate> predicates) { + checkArgument( + predicates.size() > 0, + "There must be at least 1 inner predicate to construct an AND predicate"); + if (predicates.size() == 1) { + return predicates.get(0); + } + return predicates.stream() + .reduce((a, b) -> new CompoundPredicate(And.INSTANCE, Arrays.asList(a, b))) + .get(); + } + + @Nullable + public static Predicate andNullable(Predicate... predicates) { + return andNullable(Arrays.asList(predicates)); + } + + @Nullable + public static Predicate andNullable(List<Predicate> predicates) { + predicates = predicates.stream().filter(Objects::nonNull).collect(Collectors.toList()); + if (predicates.isEmpty()) { + return null; + } + + return and(predicates); + } + + public static Predicate or(Predicate... predicates) { + return or(Arrays.asList(predicates)); + } + + public static Predicate or(List<Predicate> predicates) { + checkArgument( + predicates.size() > 0, + "There must be at least 1 inner predicate to construct an OR predicate"); + return predicates.stream() + .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b))) + .get(); + } + + public static List<Predicate> splitAnd(@Nullable Predicate predicate) { + if (predicate == null) { + return Collections.emptyList(); + } + List<Predicate> result = new ArrayList<>(); + splitCompound(And.INSTANCE, predicate, result); + return result; + } + + public static List<Predicate> splitOr(@Nullable Predicate predicate) { + if (predicate == null) { + return Collections.emptyList(); + } + List<Predicate> result = new ArrayList<>(); + splitCompound(Or.INSTANCE, predicate, result); + return result; + } + + private static void splitCompound( + CompoundPredicate.Function function, Predicate predicate, List<Predicate> result) { + if (predicate instanceof CompoundPredicate + && ((CompoundPredicate) predicate).function().equals(function)) { + for (Predicate child : ((CompoundPredicate) predicate).children()) { + splitCompound(function, child, result); + } + } else { + result.add(predicate); + } + } + + private void validateIndex(int idx) { + if (idx < 0 || idx >= rowType.getFieldCount()) { + throw new UnsupportedExpression("idx is not valid"); + } + } + + public static Object convertJavaObject(DataType literalType, Object o) { + if (o == null) { + return null; + } + switch (literalType.getTypeRoot()) { + case BOOLEAN: + return o; + case BIGINT: + return ((Number) o).longValue(); + case DOUBLE: + return ((Number) o).doubleValue(); + case TINYINT: + return ((Number) o).byteValue(); + case SMALLINT: + return ((Number) o).shortValue(); + case INTEGER: + return ((Number) o).intValue(); + case FLOAT: + return ((Number) o).floatValue(); + case STRING: + return BinaryString.fromString(o.toString()); + case DATE: + // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date + // Which uses `java.util.Date()` internally to create the object and that uses the + // TimeZone.getDefaultRef() + // To get back the expected date we have to use the LocalDate which gets rid of the + // TimeZone misery as it uses the year/month/day to generate the object + LocalDate localDate; + if (o instanceof java.sql.Timestamp) { + localDate = ((java.sql.Timestamp) o).toLocalDateTime().toLocalDate(); + } else if (o instanceof Date) { + localDate = ((Date) o).toLocalDate(); + } else if (o instanceof LocalDate) { + localDate = (LocalDate) o; + } else { + throw new UnsupportedOperationException( + "Unexpected date literal of class " + o.getClass().getName()); + } + LocalDate epochDay = + Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate(); + return (int) ChronoUnit.DAYS.between(epochDay, localDate); + case TIME_WITHOUT_TIME_ZONE: + LocalTime localTime; + if (o instanceof java.sql.Time) { + localTime = ((java.sql.Time) o).toLocalTime(); + } else if (o instanceof LocalTime) { + localTime = (LocalTime) o; + } else { + throw new UnsupportedOperationException( + "Unexpected time literal of class " + o.getClass().getName()); + } + // return millis of a day + return (int) (localTime.toNanoOfDay() / 1_000_000); + case DECIMAL: + DecimalType decimalType = (DecimalType) literalType; + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + return Decimal.fromBigDecimal((BigDecimal) o, precision, scale); + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (o instanceof java.sql.Timestamp) { + LocalDateTime localDateTime = + ((Timestamp) o) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + return TimestampNtz.fromLocalDateTime(localDateTime); + } else if (o instanceof Instant) { + Instant o1 = (Instant) o; + LocalDateTime dateTime = o1.atZone(ZoneId.systemDefault()).toLocalDateTime(); + return TimestampNtz.fromLocalDateTime(dateTime); + } else if (o instanceof LocalDateTime) { + return TimestampNtz.fromLocalDateTime((LocalDateTime) o); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported class %s for timestamp without timezone ", + o.getClass())); + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (o instanceof java.sql.Timestamp) { + java.sql.Timestamp timestamp = (java.sql.Timestamp) o; + return TimestampLtz.fromInstant(timestamp.toInstant()); + } else if (o instanceof Instant) { + return TimestampLtz.fromInstant((Instant) o); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported class %s for timestamp with local time zone ", + o.getClass())); + } + default: + throw new UnsupportedOperationException( + "Unsupported predicate leaf type " + literalType.getTypeRoot().name()); + } + } + + public static List<Predicate> pickTransformFieldMapping( + List<Predicate> predicates, List<String> inputFields, List<String> pickedFields) { + return pickTransformFieldMapping( + predicates, inputFields.stream().mapToInt(pickedFields::indexOf).toArray()); + } + + public static List<Predicate> pickTransformFieldMapping( + List<Predicate> predicates, int[] fieldIdxMapping) { + List<Predicate> pick = new ArrayList<>(); + for (Predicate p : predicates) { + Optional<Predicate> mapped = transformFieldMapping(p, fieldIdxMapping); + mapped.ifPresent(pick::add); + } + return pick; + } + + public static Optional<Predicate> transformFieldMapping( + Predicate predicate, int[] fieldIdxMapping) { + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + List<Predicate> children = new ArrayList<>(); + for (Predicate child : compoundPredicate.children()) { + Optional<Predicate> mapped = transformFieldMapping(child, fieldIdxMapping); + if (mapped.isPresent()) { + children.add(mapped.get()); + } else { + return Optional.empty(); + } + } + return Optional.of(new CompoundPredicate(compoundPredicate.function(), children)); + } else { + LeafPredicate leafPredicate = (LeafPredicate) predicate; + int mapped = fieldIdxMapping[leafPredicate.index()]; + if (mapped >= 0) { + return Optional.of( + new LeafPredicate( + leafPredicate.function(), + leafPredicate.type(), + mapped, + leafPredicate.fieldName(), + leafPredicate.literals())); + } else { + return Optional.empty(); + } + } + } + + public static boolean containsFields(Predicate predicate, Set<String> fields) { + if (predicate instanceof CompoundPredicate) { + for (Predicate child : ((CompoundPredicate) predicate).children()) { + if (containsFields(child, fields)) { + return true; + } + } + return false; + } else { + LeafPredicate leafPredicate = (LeafPredicate) predicate; + return fields.contains(leafPredicate.fieldName()); + } + } + + public static List<Predicate> excludePredicateWithFields( + @Nullable List<Predicate> predicates, Set<String> fields) { + if (predicates == null || predicates.isEmpty() || fields.isEmpty()) { + return predicates; + } + return predicates.stream() + .filter(f -> !containsFields(f, fields)) + .collect(Collectors.toList()); + } + + @Nullable + public static Predicate partition( + Map<String, String> map, RowType rowType, String defaultPartValue) { + Map<String, Object> internalValues = convertSpecToInternal(map, rowType, defaultPartValue); + List<String> fieldNames = rowType.getFieldNames(); + Predicate predicate = null; + PredicateBuilder builder = new PredicateBuilder(rowType); + for (Map.Entry<String, Object> entry : internalValues.entrySet()) { + int idx = fieldNames.indexOf(entry.getKey()); + Object literal = internalValues.get(entry.getKey()); + Predicate predicateTemp = + literal == null ? builder.isNull(idx) : builder.equal(idx, literal); + if (predicate == null) { + predicate = predicateTemp; + } else { + predicate = PredicateBuilder.and(predicate, predicateTemp); + } + } + return predicate; + } + + public static Predicate partitions( + List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) { + return PredicateBuilder.or( + partitions.stream() + .map(p -> PredicateBuilder.partition(p, rowType, defaultPartValue)) + .toArray(Predicate[]::new)); + } + + public static Map<String, Object> convertSpecToInternal( + Map<String, String> spec, RowType partType, String defaultPartValue) { + Map<String, Object> partValues = new LinkedHashMap<>(); + for (Map.Entry<String, String> entry : spec.entrySet()) { + partValues.put( + entry.getKey(), + defaultPartValue.equals(entry.getValue()) + ? null + : TypeUtils.castFromString( + entry.getValue(), partType.getField(entry.getKey()).getType())); + } + return partValues; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java new file mode 100644 index 000000000..3e20e9c92 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A visitor to visit {@link Predicate}. */ +public interface PredicateVisitor<T> { + + T visit(LeafPredicate predicate); + + T visit(CompoundPredicate predicate); +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java new file mode 100644 index 000000000..e3baa41fa --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.types.DataType; + +import java.util.List; +import java.util.Optional; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like 'abc%' or filter like + * 'abc_'}. + */ +public class StartsWith extends NullFalseLeafBinaryFunction { + + public static final StartsWith INSTANCE = new StartsWith(); + + private StartsWith() {} + + @Override + public boolean test(DataType type, Object field, Object patternLiteral) { + String fieldString = field.toString(); + return fieldString.startsWith((String) patternLiteral); + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + Object patternLiteral) { + String minStr = min.toString(); + String maxStr = max.toString(); + String pattern = patternLiteral.toString(); + return (minStr.startsWith(pattern) || minStr.compareTo(pattern) <= 0) + && (maxStr.startsWith(pattern) || maxStr.compareTo(pattern) >= 0); + } + + @Override + public Optional<LeafFunction> negate() { + return Optional.empty(); + } + + @Override + public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) { + return visitor.visitStartsWith(fieldRef, literals.get(0)); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java b/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java new file mode 100644 index 000000000..74b50f2f4 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +/** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */ +public class UnsupportedExpression extends RuntimeException { + public UnsupportedExpression(String message) { + super(message); + } + + public UnsupportedExpression() { + super(); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java index 425074482..840d0a86b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java @@ -255,4 +255,14 @@ public final class RowType extends DataType { return new RowType(isNullable, fields); } } + + public DataField getField(String fieldName) { + for (DataField field : fields) { + if (field.getName().equals(fieldName)) { + return field; + } + } + + throw new RuntimeException("Cannot find field: " + fieldName); + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java index ea4ad277b..78c792920 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java @@ -18,10 +18,12 @@ package com.alibaba.fluss.utils; import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.TimestampLtz; import com.alibaba.fluss.row.TimestampNtz; import java.time.DateTimeException; import java.util.List; +import java.util.TimeZone; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,4 +75,10 @@ public class BinaryStringUtils { throws DateTimeException { return DateTimeUtils.parseTimestampData(input.toString(), precision); } + + /** Used by {@code CAST(x as TIMESTAMPLTZ)}. */ + public static TimestampLtz toTimestampltz(BinaryString input, int precision, TimeZone timeZone) + throws DateTimeException { + return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone); + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java index 6e37bdac6..bf27a06d3 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.utils; +import com.alibaba.fluss.row.TimestampLtz; import com.alibaba.fluss.row.TimestampNtz; import java.time.DateTimeException; @@ -26,6 +27,7 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.TemporalAccessor; +import java.util.TimeZone; import static java.time.temporal.ChronoField.DAY_OF_MONTH; import static java.time.temporal.ChronoField.HOUR_OF_DAY; @@ -109,6 +111,14 @@ public class DateTimeUtils { fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision)); } + public static TimestampLtz parseTimestampData(String dateStr, int precision, TimeZone timeZone) + throws DateTimeException { + return TimestampLtz.fromInstant( + fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision) + .atZone(timeZone.toZoneId()) + .toInstant()); + } + public static Integer parseDate(String s) { // allow timestamp str to date, e.g. 2017-12-12 09:30:00.0 int ws1 = s.indexOf(" "); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java new file mode 100644 index 000000000..acb193fe3 --- /dev/null +++ b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PredicateBuilder}. */ +public class PredicateBuilderTest { + + @Test + public void testBetween() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.between(0, 1, 3); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testBetweenNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.between(0, 1, null); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testSplitAnd() { + PredicateBuilder builder = + new PredicateBuilder( + RowType.of( + new IntType(), + new IntType(), + new IntType(), + new IntType(), + new IntType(), + new IntType(), + new IntType())); + + Predicate child1 = + PredicateBuilder.or(builder.isNull(0), builder.isNull(1), builder.isNull(2)); + Predicate child2 = + PredicateBuilder.and(builder.isNull(3), builder.isNull(4), builder.isNull(5)); + Predicate child3 = builder.isNull(6); + + assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, child2, child3))) + .isEqualTo( + Arrays.asList( + child1, + builder.isNull(3), + builder.isNull(4), + builder.isNull(5), + child3)); + } +} diff --git a/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java new file mode 100644 index 000000000..427fec8a8 --- /dev/null +++ b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.predicate; + +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.StringType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static com.alibaba.fluss.row.BinaryString.fromString; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link Predicate}s. */ +public class PredicateTest { + + @Test + public void testEqual() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.equal(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5)); + } + + @Test + public void testEqualNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.equal(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testNotEqual() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.notEqual(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0, 5)); + } + + @Test + public void testNotEqualNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.notEqual(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testGreater() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.greaterThan(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(6))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0, 5)); + } + + @Test + public void testGreaterNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.greaterThan(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testGreaterOrEqual() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.greaterOrEqual(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(6))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5)); + } + + @Test + public void testGreaterOrEqualNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.greaterOrEqual(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testLess() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.lessThan(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(6))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0, 5)); + } + + @Test + public void testLessNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.lessThan(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testLessOrEqual() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.lessOrEqual(0, 5); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(6))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0, 5)); + } + + @Test + public void testLessOrEqualNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.lessOrEqual(0, null); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testIsNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.isNull(0); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(true); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0)); + } + + @Test + public void testIsNotNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.isNotNull(0); + + assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0)); + } + + @Test + public void testIn() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.in(0, Arrays.asList(1, 3)); + assertThat(predicate).isInstanceOf(CompoundPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testInNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.in(0, Arrays.asList(1, null, 3)); + assertThat(predicate).isInstanceOf(CompoundPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testNotIn() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.notIn(0, Arrays.asList(1, 3)); + assertThat(predicate).isInstanceOf(CompoundPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testNotInNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3)); + assertThat(predicate).isInstanceOf(CompoundPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testEndsWith() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); + Predicate predicate = builder.endsWith(0, ("bcc")); + GenericRow row = GenericRow.of(fromString("aabbcc")); + + assertThat(predicate.test(row)).isEqualTo(true); + } + + @Test + public void testStartWith() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); + Predicate predicate = builder.startsWith(0, ("aab")); + GenericRow row = GenericRow.of(fromString("aabbcc")); + + assertThat(predicate.test(row)).isEqualTo(true); + } + + @Test + public void testContainsWith() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); + Predicate predicate = builder.contains(0, ("def")); + GenericRow row1 = GenericRow.of(fromString("aabbdefcc")); + GenericRow row2 = GenericRow.of(fromString("aabbdcefcc")); + + assertThat(predicate.test(row1)).isEqualTo(true); + assertThat(predicate.test(row2)).isEqualTo(false); + } + + @Test + public void testLargeIn() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + List<Object> literals = new ArrayList<>(); + literals.add(1); + literals.add(3); + for (int i = 10; i < 30; i++) { + literals.add(i); + } + Predicate predicate = builder.in(0, literals); + assertThat(predicate).isInstanceOf(LeafPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testLargeInNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + List<Object> literals = new ArrayList<>(); + literals.add(1); + literals.add(null); + literals.add(3); + for (int i = 10; i < 30; i++) { + literals.add(i); + } + Predicate predicate = builder.in(0, literals); + assertThat(predicate).isInstanceOf(LeafPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testLargeNotIn() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + List<Object> literals = new ArrayList<>(); + literals.add(1); + literals.add(3); + for (int i = 10; i < 30; i++) { + literals.add(i); + } + Predicate predicate = builder.notIn(0, literals); + assertThat(predicate).isInstanceOf(LeafPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testLargeNotInNull() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + List<Object> literals = new ArrayList<>(); + literals.add(1); + literals.add(null); + literals.add(3); + for (int i = 10; i < 30; i++) { + literals.add(i); + } + Predicate predicate = builder.notIn(0, literals); + assertThat(predicate).isInstanceOf(LeafPredicate.class); + + assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of((Object) null))).isEqualTo(false); + } + + @Test + public void testAnd() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate predicate = PredicateBuilder.and(builder.equal(0, 3), builder.equal(1, 5)); + + assertThat(predicate.test(GenericRow.of(4, 5))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3, 6))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3, 5))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(null, 5))).isEqualTo(false); + + assertThat(predicate.negate().orElse(null)) + .isEqualTo(PredicateBuilder.or(builder.notEqual(0, 3), builder.notEqual(1, 5))); + } + + @Test + public void testOr() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate predicate = PredicateBuilder.or(builder.equal(0, 3), builder.equal(1, 5)); + + assertThat(predicate.test(GenericRow.of(4, 6))).isEqualTo(false); + assertThat(predicate.test(GenericRow.of(3, 6))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(3, 5))).isEqualTo(true); + assertThat(predicate.test(GenericRow.of(null, 5))).isEqualTo(true); + + assertThat(predicate.negate().orElse(null)) + .isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), builder.notEqual(1, 5))); + } + + @Test + public void testUnknownStats() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate predicate = builder.equal(0, 5); + } + + @Test + public void testPredicateToString() { + PredicateBuilder builder1 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p1 = builder1.equal(0, 5); + assertThat(p1.toString()).isEqualTo("Equal(f0, 5)"); + + PredicateBuilder builder2 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p2 = builder2.greaterThan(0, 5); + assertThat(p2.toString()).isEqualTo("GreaterThan(f0, 5)"); + + PredicateBuilder builder3 = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate p3 = PredicateBuilder.and(builder3.equal(0, 3), builder3.equal(1, 5)); + assertThat(p3.toString()).isEqualTo("And([Equal(f0, 3), Equal(f1, 5)])"); + + PredicateBuilder builder4 = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate p4 = PredicateBuilder.or(builder4.equal(0, 3), builder4.equal(1, 5)); + assertThat(p4.toString()).isEqualTo("Or([Equal(f0, 3), Equal(f1, 5)])"); + + PredicateBuilder builder5 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p5 = builder5.isNotNull(0); + assertThat(p5.toString()).isEqualTo("IsNotNull(f0)"); + + PredicateBuilder builder6 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p6 = builder6.in(0, Arrays.asList(1, null, 3, 4)); + assertThat(p6.toString()) + .isEqualTo( + "Or([Or([Or([Equal(f0, 1), Equal(f0, null)]), Equal(f0, 3)]), Equal(f0, 4)])"); + + PredicateBuilder builder7 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p7 = builder7.notIn(0, Arrays.asList(1, null, 3, 4)); + assertThat(p7.toString()) + .isEqualTo( + "And([And([And([NotEqual(f0, 1), NotEqual(f0, null)]), NotEqual(f0, 3)]), NotEqual(f0, 4)])"); + + PredicateBuilder builder8 = new PredicateBuilder(RowType.of(new IntType())); + List<Object> literals = new ArrayList<>(); + for (int i = 1; i <= 21; i++) { + literals.add(i); + } + Predicate p8 = builder8.in(0, literals); + assertThat(p8.toString()) + .isEqualTo( + "In(f0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21])"); + + PredicateBuilder builder9 = new PredicateBuilder(RowType.of(new IntType())); + Predicate p9 = builder9.notIn(0, literals); + assertThat(p9.toString()) + .isEqualTo( + "NotIn(f0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21])"); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 48288186b..5feb9c8ae 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -322,6 +322,8 @@ <exclude>com.alibaba.fluss.kafka.*</exclude> <!-- exclude for fluss-ci-tools --> <exclude>com.alibaba.fluss.tools.ci.*</exclude> + <!-- end exclude for predicate --> + <exclude>com.alibaba.fluss.predicate.*</exclude> <!-- exclude for dummy class --> <exclude>com.alibaba.fluss.dist.DummyClass</exclude> <exclude>com.alibaba.fluss.flink.DummyClass120</exclude>
