rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277049
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java:
##########
@@ -246,18 +248,70 @@ private static Optional<Expression>
convertFieldAndLiteral(
org.apache.flink.table.expressions.Expression left = args.get(0);
org.apache.flink.table.expressions.Expression right = args.get(1);
- if (left instanceof FieldReferenceExpression && right instanceof
ValueLiteralExpression) {
- String name = ((FieldReferenceExpression) left).getName();
- Optional<Object> lit = convertLiteral((ValueLiteralExpression) right);
+ Optional<Object> lit;
+ if (left instanceof FieldReferenceExpression) {
+ lit = convertExpression(right);
if (lit.isPresent()) {
- return Optional.of(convertLR.apply(name, lit.get()));
+ return Optional.of(convertLR.apply(((FieldReferenceExpression)
left).getName(), lit.get()));
}
- } else if (left instanceof ValueLiteralExpression
- && right instanceof FieldReferenceExpression) {
- Optional<Object> lit = convertLiteral((ValueLiteralExpression) left);
- String name = ((FieldReferenceExpression) right).getName();
+ } else if (right instanceof FieldReferenceExpression) {
+ lit = convertExpression(left);
if (lit.isPresent()) {
- return Optional.of(convertRL.apply(name, lit.get()));
+ return Optional.of(
+ convertRL.apply(((FieldReferenceExpression) right).getName(),
lit.get()));
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private static Optional<Object> convertExpression(
+ org.apache.flink.table.expressions.Expression expression) {
+ if (expression instanceof ValueLiteralExpression) {
+ return convertLiteral((ValueLiteralExpression) expression);
+ } else if (expression instanceof CallExpression) {
+ return convertCallExpression((CallExpression) expression);
+ }
+ return Optional.empty();
+ }
+
+ private static Optional<Object> convertCallExpression(CallExpression call) {
+ if (!BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition()))
{
+ return Optional.empty();
+ }
+
+ List<ResolvedExpression> args = call.getResolvedChildren();
+ if (args.size() != 2) {
+ return Optional.empty();
+ }
+
+ org.apache.flink.table.expressions.Expression left = args.get(0);
+ org.apache.flink.table.expressions.Expression right = args.get(1);
+
+ if (left instanceof ValueLiteralExpression && right instanceof
TypeLiteralExpression) {
+ ValueLiteralExpression value = (ValueLiteralExpression) left;
+ TypeLiteralExpression type = (TypeLiteralExpression) right;
+
+ LogicalType logicalType = type.getOutputDataType().getLogicalType();
+
+ Optional<?> result =
value.getValueAs(logicalType.getDefaultConversion());
+ if (result.isPresent()) {
+ return Optional.of(result.get());
+ }
+
+ switch (logicalType.getTypeRoot()) {
+ case DOUBLE:
+ Optional<String> strValue = value.getValueAs(String.class);
+ if (strValue.isPresent()) {
+ return Optional.of(Double.valueOf(strValue.get()));
Review Comment:
I think that this needs to handle `NumberFormatException` in case someone
uses an expression like `cast("fail" as double)`. You could put a try/except
around the switch and return `Optional.empty`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]