PHILO-HE commented on code in PR #10125:
URL:
https://github.com/apache/incubator-gluten/pull/10125#discussion_r2215588229
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java:
##########
@@ -80,11 +91,86 @@ public boolean isSupported(RexCall callNode,
RexConversionContext context) {
@Override
public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
List<TypedExpr> params = getParams(callNode, context);
- // If types are different, align them
- List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
Type resultType = getResultType(callNode);
+ if (params.size() == 2) {
+ Type leftType = params.get(0).getReturnType();
+ Type rightType = params.get(1).getReturnType();
+
+ if (leftType instanceof DecimalType || rightType instanceof DecimalType)
{
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
+ Type alignedLeftType = alignedParams.get(0).getReturnType();
+ Type alignedRightType = alignedParams.get(1).getReturnType();
+
+ if (alignedLeftType instanceof DecimalType && alignedRightType
instanceof DecimalType) {
+ Type veloxResultType =
+ calculateDecimalResultType(
+ (DecimalType) alignedLeftType, (DecimalType)
alignedRightType, functionName);
+ TypedExpr veloxExpr = new CallTypedExpr(veloxResultType,
alignedParams, functionName);
+
+ return new CallTypedExpr(resultType, List.of(veloxExpr), "cast");
+ }
+ }
+ }
+
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
return new CallTypedExpr(resultType, alignedParams, functionName);
}
+
+ private Type calculateDecimalResultType(
Review Comment:
Document reference link for the calculation.
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java:
##########
@@ -80,11 +91,86 @@ public boolean isSupported(RexCall callNode,
RexConversionContext context) {
@Override
public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
List<TypedExpr> params = getParams(callNode, context);
- // If types are different, align them
- List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
Type resultType = getResultType(callNode);
+ if (params.size() == 2) {
+ Type leftType = params.get(0).getReturnType();
+ Type rightType = params.get(1).getReturnType();
+
+ if (leftType instanceof DecimalType || rightType instanceof DecimalType)
{
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
+ Type alignedLeftType = alignedParams.get(0).getReturnType();
+ Type alignedRightType = alignedParams.get(1).getReturnType();
+
+ if (alignedLeftType instanceof DecimalType && alignedRightType
instanceof DecimalType) {
+ Type veloxResultType =
+ calculateDecimalResultType(
+ (DecimalType) alignedLeftType, (DecimalType)
alignedRightType, functionName);
+ TypedExpr veloxExpr = new CallTypedExpr(veloxResultType,
alignedParams, functionName);
+
+ return new CallTypedExpr(resultType, List.of(veloxExpr), "cast");
+ }
+ }
+ }
+
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
return new CallTypedExpr(resultType, alignedParams, functionName);
}
+
+ private Type calculateDecimalResultType(
+ DecimalType leftType, DecimalType rightType, String operation) {
+ int leftPrecision = leftType.getPrecision();
+ int leftScale = leftType.getScale();
+ int rightPrecision = rightType.getPrecision();
+ int rightScale = rightType.getScale();
+
+ int resultPrecision;
+ int resultScale;
+
+ switch (operation.toLowerCase()) {
+ case "plus":
+ case "add":
+ case "minus":
+ case "subtract":
+ // + -:precision = max(p1-s1, p2-s2) + max(s1, s2) + 1
+ // scale = max(s1, s2)
Review Comment:
// precision = ...
// scale = ...
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java:
##########
@@ -62,6 +63,16 @@ public DefaultRexCallConverter(String functionName) {
public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
List<TypedExpr> params = getParams(callNode, context);
Type resultType = getResultType(callNode);
+
+ if ("cast".equals(functionName) && params.size() == 1) {
Review Comment:
cast can only have one input, so no need to check the size.
##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -127,6 +130,26 @@ private void executeQuery(StreamTableEnvironment tEnv,
String queryFileName) {
if (!insertQuery.isEmpty()) {
TableResult insertResult = tEnv.executeSql(insertQuery);
assertThat(insertResult.getJobClient().isPresent()).isTrue();
+ try {
+ waitForJobCompletion(insertResult, 30000);
+ } catch (Exception e) {
+ throw new RuntimeException("Query execution failed: " + queryFileName,
e);
+ }
+ }
+ }
+
+ private void waitForJobCompletion(TableResult result, long timeoutMs) {
+ if (result.getJobClient().isPresent()) {
+ var jobClint = result.getJobClient().get();
+ try {
+ jobClint.getJobExecutionResult().get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Job timeout after ", e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Job execution failed ", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
Seems the above several catching can be removed. Instead, just declare the
method with `throws InterruptedExceptionException, ExecutionException`
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java:
##########
@@ -62,6 +63,16 @@ public DefaultRexCallConverter(String functionName) {
public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
List<TypedExpr> params = getParams(callNode, context);
Type resultType = getResultType(callNode);
+
+ if ("cast".equals(functionName) && params.size() == 1) {
+ TypedExpr sourceExpr = params.get(0);
+ Type sourceType = sourceExpr.getReturnType();
+
+ if (sourceType instanceof TimestampType && resultType instanceof
TimestampType) {
Review Comment:
Can we just check sourceType is as same as resultType?
##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -127,6 +130,26 @@ private void executeQuery(StreamTableEnvironment tEnv,
String queryFileName) {
if (!insertQuery.isEmpty()) {
TableResult insertResult = tEnv.executeSql(insertQuery);
assertThat(insertResult.getJobClient().isPresent()).isTrue();
+ try {
+ waitForJobCompletion(insertResult, 30000);
+ } catch (Exception e) {
+ throw new RuntimeException("Query execution failed: " + queryFileName,
e);
+ }
+ }
+ }
+
+ private void waitForJobCompletion(TableResult result, long timeoutMs) {
+ if (result.getJobClient().isPresent()) {
+ var jobClint = result.getJobClient().get();
+ try {
+ jobClint.getJobExecutionResult().get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Job timeout after ", e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Job execution failed ", e);
Review Comment:
The message seems not consistent with the exception type.
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java:
##########
@@ -80,11 +91,86 @@ public boolean isSupported(RexCall callNode,
RexConversionContext context) {
@Override
public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
List<TypedExpr> params = getParams(callNode, context);
- // If types are different, align them
- List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
Type resultType = getResultType(callNode);
+ if (params.size() == 2) {
+ Type leftType = params.get(0).getReturnType();
+ Type rightType = params.get(1).getReturnType();
+
+ if (leftType instanceof DecimalType || rightType instanceof DecimalType)
{
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+
+ Type alignedLeftType = alignedParams.get(0).getReturnType();
+ Type alignedRightType = alignedParams.get(1).getReturnType();
+
+ if (alignedLeftType instanceof DecimalType && alignedRightType
instanceof DecimalType) {
+ Type veloxResultType =
+ calculateDecimalResultType(
+ (DecimalType) alignedLeftType, (DecimalType)
alignedRightType, functionName);
+ TypedExpr veloxExpr = new CallTypedExpr(veloxResultType,
alignedParams, functionName);
+
+ return new CallTypedExpr(resultType, List.of(veloxExpr), "cast");
+ }
+ }
+ }
+
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
return new CallTypedExpr(resultType, alignedParams, functionName);
}
+
+ private Type calculateDecimalResultType(
+ DecimalType leftType, DecimalType rightType, String operation) {
+ int leftPrecision = leftType.getPrecision();
+ int leftScale = leftType.getScale();
+ int rightPrecision = rightType.getPrecision();
+ int rightScale = rightType.getScale();
+
+ int resultPrecision;
+ int resultScale;
+
+ switch (operation.toLowerCase()) {
Review Comment:
Please confirm whether the function has been normalized to lower case, maybe
on Flink side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]