Hi,
You can do sth like this:

/**
 * UDF implementing Power function with Decimal
 */
public class PowerFunction extends ScalarFunction {

    public static MathContext mc = new MathContext(18);

    public @DataTypeHint("DECIMAL(38,18)") BigDecimal
eval(@DataTypeHint("DECIMAL(38,18)")  BigDecimal x,

@DataTypeHint("DECIMAL(38,18)")  BigDecimal y) {
        return BigDecimalMath.pow(x, y, mc);
    }
}

pt., 30 lip 2021 o 05:12 LIU Xiao <liuxiaogen...@gmail.com> napisał(a):
>
> sorry for a little error, the program code should be:
>
> package poc.flink.table;
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.table.annotation.DataTypeHint;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.AggregateFunction;
> import org.apache.flink.types.Row;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.math.BigDecimal;
> import java.sql.Timestamp;
>
> import static org.apache.flink.table.api.Expressions.$;
>
> public class PocLastDecimalJob {
>     private static final Logger LOGGER = 
> LoggerFactory.getLogger(PocLastDecimalJob.class);
>
>     public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
>         public LastDecimalAccumulator(BigDecimal f0) {
>             super(f0);
>         }
>     }
>
>     public static class LastDecimalAggFunction extends 
> AggregateFunction<BigDecimal, LastDecimalAccumulator> {
>
>         @Override
>         public BigDecimal getValue(LastDecimalAccumulator accumulator) {
>             return accumulator.f0;
>         }
>
>         @Override
>         public LastDecimalAccumulator createAccumulator() {
>             return new LastDecimalAccumulator(null);
>         }
>
>         public void accumulate(LastDecimalAccumulator accumulator,
>                                @DataTypeHint("DECIMAL(38, 18)") BigDecimal 
> value) {
>             if (value != null) {
>                 accumulator.f0 = value;
>             }
>         }
>
>         public void merge(LastDecimalAccumulator accumulator, 
> Iterable<BigDecimal> iterable) {
>             if (iterable != null) {
>                 for (BigDecimal item : iterable) {
>                     accumulator.f0 = item;
>                 }
>             }
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>
>         RowTypeInfo rowTypeInfo = new RowTypeInfo(
>                 new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, 
> Types.BIG_DEC},
>                 new String[] {"rowtime", "id", "val"});
>
>         DataStream<Row> dataStream = env.fromElements(
>                 Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
>                 Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
>                 Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
>         
> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);
>
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new 
> LastDecimalAggFunction());
>
>         tableEnv.createTemporaryView("InputTable", dataStream, 
> $("rowtime").rowtime(), $("id"), $("val"));
>
>         Table resultTable = tableEnv.sqlQuery("" +
>                 "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
>                 "FROM InputTable " +
>                 "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' 
> SECOND), id");
>
>         DataStream<Row> resultStream = tableEnv
>                 .toRetractStream(resultTable, new 
> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
>                 .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> 
> value.f1);
>
>         resultStream.addSink(new SinkFunction<Row>() {
>             @Override
>             public void invoke(Row value, Context context) {
>                 LOGGER.info("SinkFunction.invoke(): value={}", value);
>             }
>         });
>
>         env.execute();
>     }
> }
>
>
> LIU Xiao <liuxiaogen...@gmail.com> 于2021年7月30日周五 上午11:04写道:
>>
>> I'm currently converting our old code (based on Flink 1.6) to Flink 1.13 and 
>> encountered a strange problem about the user-defined aggregate function 
>> which takes BigDecimal as the parameter and output:
>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> SQL validation failed. An error occurred in the type inference logic of 
>>> function 'LAST_DECIMAL'.
>>> at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>>> at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>>> at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>> at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101)
>>> Caused by: org.apache.flink.table.api.ValidationException: An error 
>>> occurred in the type inference logic of function 'LAST_DECIMAL'.
>>> at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>>> at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>>> at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>>> at java.base/java.util.Optional.flatMap(Optional.java:294)
>>> at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>>> at 
>>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>>> at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
>>> at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
>>> at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
>>> at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
>>> at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>>> at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>> ... 5 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not 
>>> extract a valid type inference for function class 
>>> 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation 
>>> mistakes and/or provide a corresponding hint.
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>> at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>>> at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
>>> at 
>>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
>>> at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>>> ... 16 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Error in 
>>> extracting a signature to output mapping.
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>>> at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>>> at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>>> ... 19 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Unable to 
>>> extract a type inference from method:
>>> public void 
>>> poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal)
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>>> ... 21 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not 
>>> extract a data type from 'class java.math.BigDecimal' in generic class 
>>> 'org.apache.flink.table.functions.AggregateFunction' in class 
>>> poc.flink.table.LastDecimalAggFunction. Please pass the required data type 
>>> manually or allow RAW types.
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>>> at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>>> ... 22 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Values of 
>>> 'java.math.BigDecimal' need fixed precision and scale.
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>> at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276)
>>> at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
>>> ... 29 more
>>
>>
>> The program source code:
>>
>> package poc.flink.table;
>>
>> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>> import org.apache.flink.table.annotation.DataTypeHint;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.AggregateFunction;
>> import org.apache.flink.types.Row;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.math.BigDecimal;
>> import java.sql.Timestamp;
>>
>> import static org.apache.flink.table.api.Expressions.$;
>>
>> public class PocLastDecimalJob {
>>     private static final Logger LOGGER = 
>> LoggerFactory.getLogger(PocLastDecimalJob.class);
>>
>>     public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
>>         public LastDecimalAccumulator(BigDecimal f0) {
>>             super(f0);
>>         }
>>     }
>>
>>     public static class LastDecimalAggFunction extends 
>> AggregateFunction<BigDecimal, LastDecimalAccumulator> {
>>
>>         @Override
>>         public BigDecimal getValue(LastDecimalAccumulator accumulator) {
>>             return accumulator.f0;
>>         }
>>
>>         @Override
>>         public LastDecimalAccumulator createAccumulator() {
>>             return new LastDecimalAccumulator(null);
>>         }
>>
>>         public void accumulate(LastDecimalAccumulator accumulator,
>>                                @DataTypeHint("DECIMAL(38, 18)") BigDecimal 
>> value) {
>>             if (value != null) {
>>                 accumulator.f0 = value;
>>             }
>>         }
>>
>>         public void merge(LastDecimalAccumulator accumulator, 
>> Iterable<BigDecimal> iterable) {
>>             if (iterable != null) {
>>                 for (BigDecimal item : iterable) {
>>                     accumulator.f0 = item;
>>                 }
>>             }
>>         }
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>
>>         RowTypeInfo rowTypeInfo = new RowTypeInfo(
>>                 new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, 
>> Types.BIG_DEC},
>>                 new String[] {"rowtime", "id", "val"});
>>
>>         DataStream<Row> dataStream = env.fromElements(
>>                 Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
>>                 Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
>>                 Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
>>         
>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);
>>
>>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>         tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new 
>> LastDecimalAggFunction());
>>
>>         tableEnv.createTemporaryView("InputTable", dataStream, 
>> $("rowtime.rowtime"), $("id"), $("val"));
>>
>>         Table resultTable = tableEnv.sqlQuery("" +
>>                 "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
>>                 "FROM InputTable " +
>>                 "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' 
>> SECOND), id");
>>
>>         DataStream<Row> resultStream = tableEnv
>>                 .toRetractStream(resultTable, new 
>> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
>>                 .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> 
>> value.f1);
>>
>>         resultStream.addSink(new SinkFunction<Row>() {
>>             @Override
>>             public void invoke(Row value, Context context) {
>>                 LOGGER.info("SinkFunction.invoke(): value={}", value);
>>             }
>>         });
>>
>>         env.execute();
>>     }
>> }



-- 
Maciek Bryński

Reply via email to