This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a54c2e7 [FLINK-25282][table-planner][table-runtime] Move runtime code from table-planner to table-runtime a54c2e7 is described below commit a54c2e75eb8f214552643adbdc2a5ce2abc2506c Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Dec 9 12:30:18 2021 +0100 [FLINK-25282][table-planner][table-runtime] Move runtime code from table-planner to table-runtime - Removes the dependency on SqlFunctions from Calcite - Move DefaultWatermarkGeneratorSupplier to runtime and rename to GeneratedWatermarkGeneratorSupplier - Remove dependency on BuiltInMethod from Calcite for floor, ceil and abs - Copy from Calcite json functions in SqlJsonUtils. Now jackson and jsonpath are shipped by runtime. - Move various Flink functions This closes #18108. --- .../apache/flink/table/functions/SqlLikeUtils.java | 24 +- .../apache/flink/table/utils/DateTimeUtils.java | 89 ++++ flink-table/flink-table-planner/pom.xml | 8 +- .../abilities/source/WatermarkPushDownSpec.java | 92 +--- .../nodes/exec/stream/StreamExecIntervalJoin.java | 75 +-- .../stream/StreamExecLegacyTableSourceScan.java | 82 +--- .../src/main/resources/META-INF/NOTICE | 4 - .../table/planner/codegen/ExprCodeGenerator.scala | 18 +- .../table/planner/codegen/GenerateUtils.scala | 32 +- .../planner/codegen/calls/BuiltInMethods.scala | 86 ++-- .../planner/codegen/calls/FloorCeilCallGen.scala | 14 +- .../planner/codegen/calls/FunctionGenerator.scala | 81 ++-- .../planner/codegen/calls/JsonValueCallGen.scala | 17 +- .../table/planner/codegen/calls/LikeCallGen.scala | 9 +- .../planner/codegen/calls/StringCallGen.scala | 7 +- flink-table/flink-table-runtime/pom.xml | 38 ++ .../table/runtime/functions/SqlFunctionUtils.java | 148 ++++++ .../table/runtime/functions/SqlJsonUtils.java | 517 ++++++++++++++++++++- .../GeneratedWatermarkGeneratorSupplier.java | 109 +++++ .../join/interval/FilterAllFlatMapFunction.java | 48 ++ .../join/interval/PaddingLeftMapFunction.java | 53 +++ .../join/interval/PaddingRightMapFunction.java | 53 +++ .../PeriodicWatermarkAssignerWrapper.java | 57 +++ .../PunctuatedWatermarkAssignerWrapper.java | 74 +++ .../src/main/resources/META-INF/NOTICE | 9 + flink-table/pom.xml | 5 +- 26 files changed, 1393 insertions(+), 356 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java index aa22466..5c3efaf 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java @@ -46,12 +46,30 @@ public class SqlLikeUtils { private SqlLikeUtils() {} - /** SQL like function with escape. */ + /** SQL {@code LIKE} function. */ + public static boolean like(String s, String pattern) { + final String regex = sqlToRegexLike(pattern, null); + return Pattern.matches(regex, s); + } + + /** SQL {@code LIKE} function with escape. */ public static boolean like(String s, String pattern, String escape) { final String regex = sqlToRegexLike(pattern, escape); return Pattern.matches(regex, s); } + /** SQL {@code SIMILAR} function. */ + public static boolean similar(String s, String pattern) { + final String regex = sqlToRegexSimilar(pattern, null); + return Pattern.matches(regex, s); + } + + /** SQL {@code SIMILAR} function with escape. */ + public static boolean similar(String s, String pattern, String escape) { + final String regex = sqlToRegexSimilar(pattern, escape); + return Pattern.matches(regex, s); + } + /** Translates a SQL LIKE pattern to Java regex pattern, with optional escape string. */ public static String sqlToRegexLike(String sqlPattern, CharSequence escapeStr) { final char escapeChar; @@ -192,7 +210,7 @@ public class SqlLikeUtils { } /** Translates a SQL SIMILAR pattern to Java regex pattern, with optional escape string. */ - static String sqlToRegexSimilar(String sqlPattern, CharSequence escapeStr) { + public static String sqlToRegexSimilar(String sqlPattern, CharSequence escapeStr) { final char escapeChar; if (escapeStr != null) { if (escapeStr.length() != 1) { @@ -206,7 +224,7 @@ public class SqlLikeUtils { } /** Translates SQL SIMILAR pattern to Java regex pattern. */ - static String sqlToRegexSimilar(String sqlPattern, char escapeChar) { + public static String sqlToRegexSimilar(String sqlPattern, char escapeChar) { similarEscapeRuleChecking(sqlPattern, escapeChar); boolean insideCharacterEnumeration = false; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index 566956a..51b0090 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -1624,6 +1624,95 @@ public class DateTimeUtils { } // -------------------------------------------------------------------------------------------- + // ADD/REMOVE months + // -------------------------------------------------------------------------------------------- + + /** + * Adds a given number of months to a timestamp, represented as the number of milliseconds since + * the epoch. + */ + public static long addMonths(long timestamp, int m) { + final long millis = DateTimeUtils.floorMod(timestamp, DateTimeUtils.MILLIS_PER_DAY); + timestamp -= millis; + final long x = addMonths((int) (timestamp / DateTimeUtils.MILLIS_PER_DAY), m); + return x * DateTimeUtils.MILLIS_PER_DAY + millis; + } + + /** + * Adds a given number of months to a date, represented as the number of days since the epoch. + */ + public static int addMonths(int date, int m) { + int y0 = (int) extractFromDate(TimeUnitRange.YEAR, date); + int m0 = (int) extractFromDate(TimeUnitRange.MONTH, date); + int d0 = (int) extractFromDate(TimeUnitRange.DAY, date); + m0 += m; + int deltaYear = (int) DateTimeUtils.floorDiv(m0, 12); + y0 += deltaYear; + m0 = (int) DateTimeUtils.floorMod(m0, 12); + if (m0 == 0) { + y0 -= 1; + m0 += 12; + } + + int last = lastDay(y0, m0); + if (d0 > last) { + d0 = last; + } + return ymdToUnixDate(y0, m0, d0); + } + + private static int lastDay(int y, int m) { + switch (m) { + case 2: + return y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) ? 29 : 28; + case 4: + case 6: + case 9: + case 11: + return 30; + default: + return 31; + } + } + + /** + * Finds the number of months between two dates, each represented as the number of days since + * the epoch. + */ + public static int subtractMonths(int date0, int date1) { + if (date0 < date1) { + return -subtractMonths(date1, date0); + } + // Start with an estimate. + // Since no month has more than 31 days, the estimate is <= the true value. + int m = (date0 - date1) / 31; + while (true) { + int date2 = addMonths(date1, m); + if (date2 >= date0) { + return m; + } + int date3 = addMonths(date1, m + 1); + if (date3 > date0) { + return m; + } + ++m; + } + } + + public static int subtractMonths(long t0, long t1) { + final long millis0 = DateTimeUtils.floorMod(t0, DateTimeUtils.MILLIS_PER_DAY); + final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0, DateTimeUtils.MILLIS_PER_DAY); + final long millis1 = DateTimeUtils.floorMod(t1, DateTimeUtils.MILLIS_PER_DAY); + final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1, DateTimeUtils.MILLIS_PER_DAY); + int x = subtractMonths(d0, d1); + final long d2 = addMonths(d1, x); + if (d2 == d0 && millis0 < millis1) { + --x; + } + return x; + } + + // -------------------------------------------------------------------------------------------- // TimeUnit and TimeUnitRange enums // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 8dd4dc9..d89cfc3 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -389,10 +389,6 @@ under the License. <include>com.esri.geometry:esri-geometry-api</include> <include>com.google.guava:guava</include> <include>com.google.guava:failureaccess</include> - <include>com.jayway.jsonpath:json-path</include> - <include>com.fasterxml.jackson.core:jackson-core</include> - <include>com.fasterxml.jackson.core:jackson-databind</include> - <include>com.fasterxml.jackson.core:jackson-annotations</include> <include>commons-codec:commons-codec</include> <include>commons-io:commons-io</include> @@ -426,12 +422,14 @@ under the License. <shadedPattern>org.apache.flink.calcite.shaded.com.google</shadedPattern> </relocation> <relocation> + <!-- Packaged in runtime --> <pattern>com.jayway</pattern> <shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern> </relocation> <relocation> + <!-- Packaged in runtime --> <pattern>com.fasterxml</pattern> - <shadedPattern>org.apache.flink.calcite.shaded.com.fasterxml</shadedPattern> + <shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern> </relocation> <relocation> <pattern>org.apache.commons.codec</pattern> diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java index 2648d31..8d20453 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java @@ -18,10 +18,7 @@ package org.apache.flink.table.planner.plan.abilities.source; -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; -import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableException; @@ -32,6 +29,7 @@ import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -41,9 +39,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp import org.apache.calcite.rex.RexNode; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import scala.Option; @@ -86,7 +81,7 @@ public class WatermarkPushDownSpec extends SourceAbilitySpecBase { Configuration configuration = context.getTableConfig().getConfiguration(); WatermarkGeneratorSupplier<RowData> supplier = - new DefaultWatermarkGeneratorSupplier( + new GeneratedWatermarkGeneratorSupplier( configuration, generatedWatermarkGenerator); WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier); @@ -116,87 +111,4 @@ public class WatermarkPushDownSpec extends SourceAbilitySpecBase { } return String.format("watermark=[%s]", expressionStr); } - - /** - * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to create {@link - * WatermarkGenerator}. The {@link DefaultWatermarkGeneratorSupplier} uses the {@link - * WatermarkGeneratorSupplier.Context} to init the generated watermark generator. - */ - public static class DefaultWatermarkGeneratorSupplier - implements WatermarkGeneratorSupplier<RowData> { - private static final long serialVersionUID = 1L; - - private final Configuration configuration; - private final GeneratedWatermarkGenerator generatedWatermarkGenerator; - - public DefaultWatermarkGeneratorSupplier( - Configuration configuration, - GeneratedWatermarkGenerator generatedWatermarkGenerator) { - this.configuration = configuration; - this.generatedWatermarkGenerator = generatedWatermarkGenerator; - } - - @Override - public WatermarkGenerator<RowData> createWatermarkGenerator(Context context) { - - List<Object> references = - new ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences())); - references.add(context); - - org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator = - new GeneratedWatermarkGenerator( - generatedWatermarkGenerator.getClassName(), - generatedWatermarkGenerator.getCode(), - references.toArray(), - configuration) - .newInstance(Thread.currentThread().getContextClassLoader()); - - try { - innerWatermarkGenerator.open(configuration); - } catch (Exception e) { - throw new RuntimeException("Fail to instantiate generated watermark generator.", e); - } - return new DefaultWatermarkGeneratorSupplier.DefaultWatermarkGenerator( - innerWatermarkGenerator); - } - - /** - * Wrapper of the code-generated {@link - * org.apache.flink.table.runtime.generated.WatermarkGenerator}. - */ - public static class DefaultWatermarkGenerator implements WatermarkGenerator<RowData> { - private static final long serialVersionUID = 1L; - - private final org.apache.flink.table.runtime.generated.WatermarkGenerator - innerWatermarkGenerator; - private Long currentWatermark = Long.MIN_VALUE; - - public DefaultWatermarkGenerator( - org.apache.flink.table.runtime.generated.WatermarkGenerator - watermarkGenerator) { - this.innerWatermarkGenerator = watermarkGenerator; - } - - @Override - public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) { - try { - Long watermark = innerWatermarkGenerator.currentWatermark(event); - if (watermark != null) { - currentWatermark = watermark; - } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Generated WatermarkGenerator fails to generate for row: %s.", - event), - e); - } - } - - @Override - public void onPeriodicEmit(WatermarkOutput output) { - output.emitWatermark(new Watermark(currentWatermark)); - } - } - } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java index b5a8163..036d71e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java @@ -18,11 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.api.operators.StreamMap; @@ -47,12 +43,14 @@ import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay; import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil; +import org.apache.flink.table.runtime.operators.join.interval.FilterAllFlatMapFunction; import org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction; +import org.apache.flink.table.runtime.operators.join.interval.PaddingLeftMapFunction; +import org.apache.flink.table.runtime.operators.join.interval.PaddingRightMapFunction; import org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin; import org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; @@ -196,73 +194,6 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> } } - private static class FilterAllFlatMapFunction - implements FlatMapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { - private static final long serialVersionUID = 1L; - - private final InternalTypeInfo<RowData> outputTypeInfo; - - public FilterAllFlatMapFunction(InternalTypeInfo<RowData> inputTypeInfo) { - this.outputTypeInfo = inputTypeInfo; - } - - @Override - public void flatMap(RowData value, Collector<RowData> out) {} - - @Override - public TypeInformation<RowData> getProducedType() { - return outputTypeInfo; - } - } - - private static class PaddingLeftMapFunction - implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { - private static final long serialVersionUID = 1L; - - private final OuterJoinPaddingUtil paddingUtil; - private final InternalTypeInfo<RowData> outputTypeInfo; - - public PaddingLeftMapFunction( - OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) { - this.paddingUtil = paddingUtil; - this.outputTypeInfo = returnType; - } - - @Override - public RowData map(RowData value) { - return paddingUtil.padLeft(value); - } - - @Override - public TypeInformation<RowData> getProducedType() { - return outputTypeInfo; - } - } - - private static class PaddingRightMapFunction - implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { - private static final long serialVersionUID = 1L; - - private final OuterJoinPaddingUtil paddingUtil; - private final InternalTypeInfo<RowData> outputTypeInfo; - - public PaddingRightMapFunction( - OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) { - this.paddingUtil = paddingUtil; - this.outputTypeInfo = returnType; - } - - @Override - public RowData map(RowData value) { - return paddingUtil.padRight(value); - } - - @Override - public TypeInformation<RowData> getProducedType() { - return outputTypeInfo; - } - } - private Transformation<RowData> createNegativeWindowSizeJoin( JoinSpec joinSpec, Transformation<RowData> leftInputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java index 50b87ab..e8b182d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java @@ -25,12 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.OperatorCodeGenerator; @@ -41,6 +36,8 @@ import org.apache.flink.table.planner.plan.utils.ScanUtil; import org.apache.flink.table.planner.sources.TableSourceUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper; +import org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; @@ -48,9 +45,7 @@ import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner; import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner; import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.Row; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; @@ -176,77 +171,4 @@ public class StreamExecLegacyTableSourceScan extends CommonExecLegacyTableSource .name(tableSource.explainSource()) .getTransformation(); } - - /** Generates periodic watermarks based on a {@link PeriodicWatermarkAssigner}. */ - private static class PeriodicWatermarkAssignerWrapper - implements AssignerWithPeriodicWatermarks<RowData> { - private static final long serialVersionUID = 1L; - private final PeriodicWatermarkAssigner assigner; - private final int timeFieldIdx; - - /** - * @param timeFieldIdx the index of the rowtime attribute. - * @param assigner the watermark assigner. - */ - private PeriodicWatermarkAssignerWrapper( - PeriodicWatermarkAssigner assigner, int timeFieldIdx) { - this.assigner = assigner; - this.timeFieldIdx = timeFieldIdx; - } - - @Nullable - @Override - public Watermark getCurrentWatermark() { - return assigner.getWatermark(); - } - - @Override - public long extractTimestamp(RowData row, long recordTimestamp) { - long timestamp = row.getTimestamp(timeFieldIdx, 3).getMillisecond(); - assigner.nextTimestamp(timestamp); - return 0; - } - } - - /** Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]]. */ - private static class PunctuatedWatermarkAssignerWrapper - implements AssignerWithPunctuatedWatermarks<RowData> { - private static final long serialVersionUID = 1L; - private final PunctuatedWatermarkAssigner assigner; - private final int timeFieldIdx; - private final DataFormatConverters.DataFormatConverter<RowData, Row> converter; - - /** - * @param timeFieldIdx the index of the rowtime attribute. - * @param assigner the watermark assigner. - * @param sourceType the type of source - */ - @SuppressWarnings("unchecked") - private PunctuatedWatermarkAssignerWrapper( - PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType sourceType) { - this.assigner = assigner; - this.timeFieldIdx = timeFieldIdx; - DataType originDataType; - if (sourceType instanceof FieldsDataType) { - originDataType = sourceType; - } else { - originDataType = DataTypes.ROW(DataTypes.FIELD("f0", sourceType)); - } - converter = - DataFormatConverters.getConverterForDataType( - originDataType.bridgedTo(Row.class)); - } - - @Nullable - @Override - public Watermark checkAndGetNextWatermark(RowData row, long extractedTimestamp) { - long timestamp = row.getLong(timeFieldIdx); - return assigner.getWatermark(converter.toExternal(row), timestamp); - } - - @Override - public long extractTimestamp(RowData element, long recordTimestamp) { - return 0; - } - } } diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE index 08a07c7..2b53463 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE +++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE @@ -9,10 +9,6 @@ This project bundles the following dependencies under the Apache Software Licens - com.google.guava:guava:29.0-jre - com.google.guava:failureaccess:1.0.1 - com.esri.geometry:esri-geometry-api:2.2.0 -- com.fasterxml.jackson.core:jackson-annotations:2.13.0 -- com.fasterxml.jackson.core:jackson-core:2.13.0 -- com.fasterxml.jackson.core:jackson-databind:2.13.0 -- com.jayway.jsonpath:json-path:2.4.0 - org.apache.calcite:calcite-core:1.26.0 - org.apache.calcite:calcite-linq4j:1.26.0 - org.apache.calcite.avatica:avatica-core:1.17.0 diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala index 6009cc1..f083c39 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala @@ -18,6 +18,10 @@ package org.apache.flink.table.planner.codegen +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName} +import org.apache.calcite.sql.{SqlKind, SqlOperator} +import org.apache.calcite.util.{Sarg, TimestampString} import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableException import org.apache.flink.table.data.RowData @@ -42,10 +46,6 @@ import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTem import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, isCompositeType} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo -import org.apache.calcite.rex._ -import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName} -import org.apache.calcite.sql.{SqlKind, SqlOperator} -import org.apache.calcite.util.{Sarg, TimestampString} import scala.collection.JavaConversions._ @@ -418,15 +418,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean) case _ => literal.getValue3 } - // Make sure to convert avatica time types to flink internal types - val convertedValue = value match { - case tu: org.apache.calcite.avatica.util.TimeUnit => - org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name()) - case tur: org.apache.calcite.avatica.util.TimeUnitRange => - org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name()) - case _ => value - } - generateLiteral(ctx, resultType, convertedValue) + generateLiteral(ctx, resultType, value) } override def visitCorrelVariable(correlVariable: RexCorrelVariable): GeneratedExpression = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala index 3fa5a27..fcf92b4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.codegen import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{AtomicType => AtomicTypeInfo} -import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError, JsonQueryWrapper} import org.apache.flink.table.data._ import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.data.utils.JoinedRowData @@ -30,13 +30,14 @@ import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, import org.apache.flink.table.planner.codegen.calls.CurrentTimePointCallGen import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec import org.apache.flink.table.planner.plan.utils.SortUtil +import org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, isReference, isTemporal} import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, getFieldTypes} -import org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime import org.apache.calcite.avatica.util.ByteString +import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior} import org.apache.calcite.util.TimestampString import org.apache.commons.lang3.StringEscapeUtils @@ -443,13 +444,34 @@ object GenerateUtils { } } - def generateSymbol(enum: Enum[_]): GeneratedExpression = { + def generateSymbol(value: Enum[_]): GeneratedExpression = { + // Make sure to convert calcite enum types to flink types + val convertedValue = value match { + case tu: org.apache.calcite.avatica.util.TimeUnit => + org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name()) + case tur: org.apache.calcite.avatica.util.TimeUnitRange => + org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name()) + case jeeb: SqlJsonExistsErrorBehavior => + JsonExistsOnError.valueOf(jeeb.name()) + case jqeeb: SqlJsonQueryEmptyOrErrorBehavior => + JsonQueryOnEmptyOrError.valueOf(jqeeb.name()) + case jqwb: SqlJsonQueryWrapperBehavior => jqwb match { + case SqlJsonQueryWrapperBehavior.WITHOUT_ARRAY => + JsonQueryWrapper.WITHOUT_ARRAY + case SqlJsonQueryWrapperBehavior.WITH_CONDITIONAL_ARRAY => + JsonQueryWrapper.CONDITIONAL_ARRAY + case SqlJsonQueryWrapperBehavior.WITH_UNCONDITIONAL_ARRAY => + JsonQueryWrapper.UNCONDITIONAL_ARRAY + } + case _ => value + } + GeneratedExpression( - qualifyEnum(enum), + qualifyEnum(convertedValue), NEVER_NULL, NO_CODE, new SymbolType(false), - literalValue = Some(enum)) + literalValue = Some(convertedValue)) } /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index 0ee5b96..0b5e6a7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -18,14 +18,15 @@ package org.apache.flink.table.planner.codegen.calls +import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError, JsonQueryWrapper, JsonValueOnEmptyOrError} +import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil} import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData} +import org.apache.flink.table.functions.SqlLikeUtils import org.apache.flink.table.runtime.functions._ import org.apache.flink.table.utils.DateTimeUtils import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange + import org.apache.calcite.linq4j.tree.Types -import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions} -import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior} -import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil} import java.lang.reflect.Method import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort} @@ -82,17 +83,22 @@ object BuiltInMethods { val POWER_DEC_NUM = Types.lookupMethod( classOf[SqlFunctionUtils], "power", classOf[DecimalData], classOf[Double]) - // TRIGONOMETRIC FUNCTIONS val LN = Types.lookupMethod(classOf[SqlFunctionUtils], "log", classOf[Double]) val LN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "log", classOf[DecimalData]) - val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double]) + val ABS = Types.lookupMethod(classOf[SqlFunctionUtils], "abs", classOf[Double]) val ABS_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "abs", classOf[DecimalData]) + val FLOOR = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", classOf[Double]) + val FLOOR_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", classOf[Int], + classOf[Int]) val FLOOR_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", classOf[DecimalData]) + val CEIL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", classOf[Double]) + val CEIL_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", classOf[Int], + classOf[Int]) val CEIL_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", classOf[DecimalData]) val SIN = Types.lookupMethod(classOf[Math], "sin", classOf[Double]) @@ -104,7 +110,7 @@ object BuiltInMethods { val TAN = Types.lookupMethod(classOf[Math], "tan", classOf[Double]) val TAN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "tan", classOf[DecimalData]) - val COT = Types.lookupMethod(classOf[SqlFunctions], "cot", classOf[Double]) + val COT = Types.lookupMethod(classOf[SqlFunctionUtils], "cot", classOf[Double]) val COT_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "cot", classOf[DecimalData]) val ASIN = Types.lookupMethod(classOf[Math], "asin", classOf[Double]) @@ -147,10 +153,12 @@ object BuiltInMethods { val SIGN_LONG = Types.lookupMethod(classOf[JLong], "signum", classOf[Long]) val SIGN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "sign", classOf[DecimalData]) - val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[Double], + val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", classOf[Double], + classOf[Int]) + val ROUND_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", classOf[Int], + classOf[Int]) + val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", classOf[Long], classOf[Int]) - val ROUND_INT = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[Int], classOf[Int]) - val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[Long], classOf[Int]) val ROUND_BYTE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", classOf[Byte], classOf[Int]) val ROUND_SHORT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", @@ -399,24 +407,24 @@ object BuiltInMethods { val STRING_TO_TIME = Types.lookupMethod( classOf[DateTimeUtils], "parseTime", classOf[String]) - val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Double]) val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Float]) - val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Int]) - val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Long]) val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[DecimalData]) - val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Double], classOf[Int]) val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Float], classOf[Int]) - val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Int], classOf[Int]) - val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate", + val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[Long], classOf[Int]) val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate", classOf[DecimalData], classOf[Int]) @@ -439,29 +447,41 @@ object BuiltInMethods { val TRUNCATE_SQL_TIMESTAMP = Types.lookupMethod(classOf[DateTimeUtils], "truncate", classOf[TimestampData], classOf[Int]) - val ADD_MONTHS = Types.lookupMethod(classOf[SqlFunctions], "addMonths", + val ADD_MONTHS = Types.lookupMethod(classOf[DateTimeUtils], "addMonths", classOf[Long], classOf[Int]) - val SUBTRACT_MONTHS = Types.lookupMethod(classOf[SqlFunctions], "subtractMonths", + val SUBTRACT_MONTHS = Types.lookupMethod(classOf[DateTimeUtils], "subtractMonths", classOf[Long], classOf[Long]) // JSON functions - val JSON_EXISTS = Types.lookupMethod(classOf[JsonFunctions], "jsonExists", + val JSON_EXISTS = Types.lookupMethod(classOf[SqlJsonUtils], "jsonExists", classOf[String], classOf[String]) - val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[JsonFunctions], "jsonExists", - classOf[String], classOf[String], classOf[SqlJsonExistsErrorBehavior]) + val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[SqlJsonUtils], "jsonExists", + classOf[String], classOf[String], classOf[JsonExistsOnError]) - val JSON_VALUE = Types.lookupMethod(classOf[JsonFunctions], "jsonValue", + val JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "jsonValue", classOf[String], classOf[String], - classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any], - classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any] + classOf[JsonValueOnEmptyOrError], classOf[Any], + classOf[JsonValueOnEmptyOrError], classOf[Any] ) - val JSON_QUERY = Types.lookupMethod(classOf[JsonFunctions], "jsonQuery", - classOf[String], classOf[String], classOf[SqlJsonQueryWrapperBehavior], - classOf[SqlJsonQueryEmptyOrErrorBehavior], classOf[SqlJsonQueryEmptyOrErrorBehavior]) + val JSON_QUERY = Types.lookupMethod(classOf[SqlJsonUtils], "jsonQuery", + classOf[String], classOf[String], classOf[JsonQueryWrapper], + classOf[JsonQueryOnEmptyOrError], classOf[JsonQueryOnEmptyOrError]) + + val IS_JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonValue", + classOf[String]) + + val IS_JSON_OBJECT = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonObject", + classOf[String]) + + val IS_JSON_ARRAY = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonArray", + classOf[String]) + + val IS_JSON_SCALAR = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonScalar", + classOf[String]) // STRING functions @@ -522,6 +542,20 @@ object BuiltInMethods { val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod( classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], classOf[TimeZone]) + val STRING_LIKE = Types.lookupMethod( + classOf[SqlLikeUtils], "like", classOf[String], classOf[String]) + + val STRING_LIKE_WITH_ESCAPE = Types.lookupMethod( + classOf[SqlLikeUtils], "like", classOf[String], classOf[String], classOf[String]) + + val STRING_SIMILAR = Types.lookupMethod( + classOf[SqlLikeUtils], "similar", classOf[String], classOf[String]) + + val STRING_SIMILAR_WITH_ESCAPE = Types.lookupMethod( + classOf[SqlLikeUtils], "similar", classOf[String], classOf[String], classOf[String]) + + val STRING_INITCAP = Types.lookupMethod(classOf[SqlFunctionUtils], "initcap", classOf[String]) + // DecimalData functions val DECIMAL_TO_DECIMAL = Types.lookupMethod( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala index e9e9df8..8333af7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala @@ -22,7 +22,6 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils.{TIMESTAMP_DATA, getE import org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot} - import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange._ @@ -34,6 +33,8 @@ import java.util.TimeZone */ class FloorCeilCallGen( arithmeticMethod: Method, + arithmeticIntegralMethod: Option[Method] = None, + decimalMethod: Option[Method] = None, temporalMethod: Option[Method] = None) extends MethodCallGen(arithmeticMethod) { @@ -49,7 +50,7 @@ class FloorCeilCallGen( case LogicalTypeRoot.DECIMAL => generateCallIfArgsNotNull(ctx, returnType, operands) { operandResultTerms => - s"${qualifyMethod(arithmeticMethod)}(${operandResultTerms.mkString(", ")})" + s"${qualifyMethod(decimalMethod.get)}(${operandResultTerms.mkString(", ")})" } case _ => operands.head // no floor/ceil necessary @@ -98,13 +99,14 @@ class FloorCeilCallGen( case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE => val longTerm = s"${terms.head}.getMillisecond()" s""" - |$TIMESTAMP_DATA.fromEpochMillis(${qualifyMethod(arithmeticMethod)}( - | $longTerm, - | (long) ${unit.startUnit.multiplier.intValue()})) + |$TIMESTAMP_DATA.fromEpochMillis( + | ${qualifyMethod(arithmeticIntegralMethod.get)}( + | $longTerm, + | (long) ${unit.startUnit.multiplier.intValue()})) """.stripMargin case _ => s""" - |${qualifyMethod(arithmeticMethod)}( + |${qualifyMethod(arithmeticIntegralMethod.get)}( | ($internalType) ${terms.head}, | ($internalType) ${unit.startUnit.multiplier.intValue()}) |""".stripMargin diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala index b7ce412..22cb1a9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.codegen.calls +import org.apache.calcite.sql.SqlOperator import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.table.api.TableConfig @@ -26,11 +27,7 @@ import org.apache.flink.table.runtime.types.PlannerTypeUtils.isPrimitive import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot} -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.util.BuiltInMethod - import java.lang.reflect.Method - import scala.collection.mutable class FunctionGenerator private(config: TableConfig) { @@ -115,22 +112,22 @@ class FunctionGenerator private(config: TableConfig) { addSqlFunction( FLOOR, Seq(DOUBLE), - new FloorCeilCallGen(BuiltInMethod.FLOOR.method)) + new FloorCeilCallGen(BuiltInMethods.FLOOR)) - addSqlFunction( + addSqlFunctionMethod( FLOOR, Seq(DECIMAL), - new FloorCeilCallGen(BuiltInMethods.FLOOR_DEC)) + BuiltInMethods.FLOOR_DEC) addSqlFunction( CEIL, Seq(DOUBLE), - new FloorCeilCallGen(BuiltInMethod.CEIL.method)) + new FloorCeilCallGen(BuiltInMethods.CEIL)) - addSqlFunction( + addSqlFunctionMethod( CEIL, Seq(DECIMAL), - new FloorCeilCallGen(BuiltInMethods.CEIL_DEC)) + BuiltInMethods.CEIL_DEC) addSqlFunctionMethod( SIN, @@ -462,28 +459,36 @@ class FunctionGenerator private(config: TableConfig) { FLOOR, Seq(DATE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.FLOOR.method, + BuiltInMethods.FLOOR, + Some(BuiltInMethods.FLOOR_INTEGRAL), + Some(BuiltInMethods.FLOOR_DEC), Some(BuiltInMethods.UNIX_DATE_FLOOR))) addSqlFunction( FLOOR, Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.FLOOR.method, + BuiltInMethods.FLOOR, + Some(BuiltInMethods.FLOOR_INTEGRAL), + Some(BuiltInMethods.FLOOR_DEC), Some(BuiltInMethods.UNIX_DATE_FLOOR))) addSqlFunction( FLOOR, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.FLOOR.method, + BuiltInMethods.FLOOR, + Some(BuiltInMethods.FLOOR_INTEGRAL), + Some(BuiltInMethods.FLOOR_DEC), Some(BuiltInMethods.UNIX_TIMESTAMP_FLOOR))) addSqlFunction( FLOOR, Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.FLOOR.method, + BuiltInMethods.FLOOR, + Some(BuiltInMethods.FLOOR_INTEGRAL), + Some(BuiltInMethods.FLOOR_DEC), Some(BuiltInMethods.TIMESTAMP_FLOOR_TIME_ZONE))) // TODO: fixme if CALCITE-3199 fixed @@ -492,28 +497,36 @@ class FunctionGenerator private(config: TableConfig) { CEIL, Seq(DATE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.CEIL.method, + BuiltInMethods.CEIL, + Some(BuiltInMethods.CEIL_INTEGRAL), + Some(BuiltInMethods.CEIL_DEC), Some(BuiltInMethods.UNIX_DATE_CEIL))) addSqlFunction( CEIL, Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.CEIL.method, + BuiltInMethods.CEIL, + Some(BuiltInMethods.CEIL_INTEGRAL), + Some(BuiltInMethods.CEIL_DEC), Some(BuiltInMethods.UNIX_DATE_CEIL))) addSqlFunction( CEIL, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.CEIL.method, + BuiltInMethods.CEIL, + Some(BuiltInMethods.CEIL_INTEGRAL), + Some(BuiltInMethods.CEIL_DEC), Some(BuiltInMethods.UNIX_TIMESTAMP_CEIL))) addSqlFunction( CEIL, Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL), new FloorCeilCallGen( - BuiltInMethod.CEIL.method, + BuiltInMethods.CEIL, + Some(BuiltInMethods.CEIL_INTEGRAL), + Some(BuiltInMethods.CEIL_DEC), Some(BuiltInMethods.TIMESTAMP_CEIL_TIME_ZONE))) addSqlFunction( @@ -821,44 +834,44 @@ class FunctionGenerator private(config: TableConfig) { BuiltInMethods.JSON_QUERY) addSqlFunctionMethod(IS_JSON_VALUE, Seq(CHAR), - BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true) + BuiltInMethods.IS_JSON_VALUE, argsNullable = true) addSqlFunctionMethod(IS_JSON_VALUE, Seq(VARCHAR), - BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true) + BuiltInMethods.IS_JSON_VALUE, argsNullable = true) addSqlFunctionMethod(IS_JSON_OBJECT, Seq(CHAR), - BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true) + BuiltInMethods.IS_JSON_OBJECT, argsNullable = true) addSqlFunctionMethod(IS_JSON_OBJECT, Seq(VARCHAR), - BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true) + BuiltInMethods.IS_JSON_OBJECT, argsNullable = true) addSqlFunctionMethod(IS_JSON_ARRAY, Seq(CHAR), - BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true) + BuiltInMethods.IS_JSON_ARRAY, argsNullable = true) addSqlFunctionMethod(IS_JSON_ARRAY, Seq(VARCHAR), - BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true) + BuiltInMethods.IS_JSON_ARRAY, argsNullable = true) addSqlFunctionMethod(IS_JSON_SCALAR, Seq(CHAR), - BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true) + BuiltInMethods.IS_JSON_SCALAR, argsNullable = true) addSqlFunctionMethod(IS_JSON_SCALAR, Seq(VARCHAR), - BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true) + BuiltInMethods.IS_JSON_SCALAR, argsNullable = true) addSqlFunction(IS_NOT_JSON_VALUE, Seq(CHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE, argsNullable = true))) addSqlFunction(IS_NOT_JSON_VALUE, Seq(VARCHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE, argsNullable = true))) addSqlFunction(IS_NOT_JSON_OBJECT, Seq(CHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT, argsNullable = true))) addSqlFunction(IS_NOT_JSON_OBJECT, Seq(VARCHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT, argsNullable = true))) addSqlFunction(IS_NOT_JSON_ARRAY, Seq(CHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY, argsNullable = true))) addSqlFunction(IS_NOT_JSON_ARRAY, Seq(VARCHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY, argsNullable = true))) addSqlFunction(IS_NOT_JSON_SCALAR, Seq(CHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, argsNullable = true))) addSqlFunction(IS_NOT_JSON_SCALAR, Seq(VARCHAR), - new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true))) + new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, argsNullable = true))) // ---------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala index 0b54a5c..376b46b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala @@ -17,7 +17,9 @@ */ package org.apache.flink.table.planner.codegen.calls + import org.apache.calcite.sql.{SqlJsonEmptyOrError, SqlJsonValueEmptyOrErrorBehavior} +import org.apache.flink.table.api.JsonValueOnEmptyOrError import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, qualifyEnum, qualifyMethod} import org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression} @@ -81,16 +83,16 @@ class JsonValueCallGen extends CallGenerator { */ private def getBehavior( operands: Seq[GeneratedExpression], - mode: SqlJsonEmptyOrError): (SqlJsonValueEmptyOrErrorBehavior, String) = { + mode: SqlJsonEmptyOrError): (JsonValueOnEmptyOrError, String) = { operands.indexWhere(expr => expr.literalValue.contains(mode)) match { - case -1 => (SqlJsonValueEmptyOrErrorBehavior.NULL, null) + case -1 => (JsonValueOnEmptyOrError.NULL, null) case modeIndex => operands(modeIndex - 1).literalValue.get match { // Case for [NULL | ERROR] ON [EMPTY | ERROR] - case behavior: SqlJsonValueEmptyOrErrorBehavior => (behavior, null) + case behavior: SqlJsonValueEmptyOrErrorBehavior => (convertCalciteEnum(behavior), null) case _ => operands(modeIndex - 2).literalValue.get match { // Case for DEFAULT <expr> ON [EMPTY | ERROR] case behavior: SqlJsonValueEmptyOrErrorBehavior => - (behavior, operands(modeIndex - 1).resultTerm) + (convertCalciteEnum(behavior), operands(modeIndex - 1).resultTerm) case _ => throw new CodeGenException("Invalid combination of arguments for JSON_VALUE. " + "This is a bug. Please consider filing an issue.") @@ -98,4 +100,11 @@ class JsonValueCallGen extends CallGenerator { } } } + + private def convertCalciteEnum( + behavior: SqlJsonValueEmptyOrErrorBehavior): JsonValueOnEmptyOrError = behavior match { + case SqlJsonValueEmptyOrErrorBehavior.ERROR => JsonValueOnEmptyOrError.ERROR + case SqlJsonValueEmptyOrErrorBehavior.NULL => JsonValueOnEmptyOrError.NULL + case SqlJsonValueEmptyOrErrorBehavior.DEFAULT => JsonValueOnEmptyOrError.DEFAULT + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala index 49b7dc2..cd8035d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala @@ -19,14 +19,12 @@ package org.apache.flink.table.planner.codegen.calls import org.apache.flink.table.functions.SqlLikeUtils -import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName} +import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, qualifyMethod} import org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.runtime.functions.SqlLikeChainChecker import org.apache.flink.table.types.logical.{BooleanType, LogicalType} -import org.apache.calcite.runtime.SqlFunctions - import java.util.regex.Pattern /** @@ -161,12 +159,11 @@ class LikeCallGen extends CallGenerator { terms => val str1 = s"${terms.head}.toString()" val str2 = s"${terms(1)}.toString()" - val clsName = className[SqlFunctions] if (terms.length == 2) { - s"$clsName.like($str1, $str2)" + s"${qualifyMethod(BuiltInMethods.STRING_LIKE)}($str1, $str2)" } else { val str3 = s"${terms(2)}.toString()" - s"$clsName.like($str1, $str2, $str3)" + s"${qualifyMethod(BuiltInMethods.STRING_LIKE_WITH_ESCAPE)}($str1, $str2, $str3)" } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 7619486..9de99fc 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -29,7 +29,6 @@ import org.apache.flink.table.runtime.functions.SqlFunctionUtils import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, isTimestamp, isTimestampWithLocalZone} import org.apache.flink.table.types.logical._ -import org.apache.calcite.runtime.SqlFunctions import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING} @@ -341,9 +340,8 @@ object StringCallGen { def generateSimilarTo( ctx: CodeGeneratorContext, operands: Seq[GeneratedExpression]): GeneratedExpression = { - val className = classOf[SqlFunctions].getCanonicalName generateCallIfArgsNotNull(ctx, new BooleanType(), operands) { - terms => s"$className.similar(${toStringTerms(terms, operands)})" + terms => s"${qualifyMethod(BuiltInMethods.STRING_SIMILAR)}(${toStringTerms(terms, operands)})" } } @@ -424,9 +422,8 @@ object StringCallGen { ctx: CodeGeneratorContext, operands: Seq[GeneratedExpression], returnType: LogicalType): GeneratedExpression = { - val className = classOf[SqlFunctions].getCanonicalName generateStringResultCallIfArgsNotNull(ctx, operands, returnType) { - terms => s"$className.initcap(${terms.head}.toString())" + terms => s"${qualifyMethod(BuiltInMethods.STRING_INITCAP)}(${terms.head}.toString())" } } diff --git a/flink-table/flink-table-runtime/pom.xml b/flink-table/flink-table-runtime/pom.xml index a6a6f9a..0c85dcc 100644 --- a/flink-table/flink-table-runtime/pom.xml +++ b/flink-table/flink-table-runtime/pom.xml @@ -84,6 +84,14 @@ under the License. <version>${janino.version}</version> </dependency> + <!-- Jackson --> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>${jsonpath.version}</version> + </dependency> + <!-- test dependencies --> <dependency> @@ -131,6 +139,36 @@ under the License. </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>com.jayway.jsonpath:json-path</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.jayway</pattern> + <shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml</pattern> + <shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index c9016eb..06628ba 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -47,6 +47,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.flink.table.data.DecimalDataUtils.castFrom; +import static org.apache.flink.table.data.DecimalDataUtils.castToIntegral; import static org.apache.flink.table.data.DecimalDataUtils.doubleValue; /** @@ -140,6 +141,11 @@ public class SqlFunctionUtils { return Math.tanh(doubleValue(a)); } + /** SQL <code>COT</code> operator applied to double values. */ + public static double cot(double b0) { + return 1.0d / Math.tan(b0); + } + public static double cot(DecimalData a) { return 1.0d / Math.tan(doubleValue(a)); } @@ -152,14 +158,92 @@ public class SqlFunctionUtils { return Math.toRadians(doubleValue(angdeg)); } + /** SQL <code>ABS</code> operator applied to byte values. */ + public static byte abs(byte b0) { + return (byte) Math.abs(b0); + } + + /** SQL <code>ABS</code> operator applied to short values. */ + public static short abs(short b0) { + return (short) Math.abs(b0); + } + + /** SQL <code>ABS</code> operator applied to int values. */ + public static int abs(int b0) { + return Math.abs(b0); + } + + /** SQL <code>ABS</code> operator applied to long values. */ + public static long abs(long b0) { + return Math.abs(b0); + } + + /** SQL <code>ABS</code> operator applied to float values. */ + public static float abs(float b0) { + return Math.abs(b0); + } + + /** SQL <code>ABS</code> operator applied to double values. */ + public static double abs(double b0) { + return Math.abs(b0); + } + public static DecimalData abs(DecimalData a) { return DecimalDataUtils.abs(a); } + public static double floor(double b0) { + return Math.floor(b0); + } + + public static float floor(float b0) { + return (float) Math.floor(b0); + } + + /** SQL <code>FLOOR</code> operator applied to int values. */ + public static int floor(int b0, int b1) { + int r = b0 % b1; + if (r < 0) { + r += b1; + } + return b0 - r; + } + + /** SQL <code>FLOOR</code> operator applied to long values. */ + public static long floor(long b0, long b1) { + long r = b0 % b1; + if (r < 0) { + r += b1; + } + return b0 - r; + } + public static DecimalData floor(DecimalData a) { return DecimalDataUtils.floor(a); } + public static double ceil(double b0) { + return Math.ceil(b0); + } + + public static float ceil(float b0) { + return (float) Math.ceil(b0); + } + + /** SQL <code>CEIL</code> operator applied to int values. */ + public static int ceil(int b0, int b1) { + int r = b0 % b1; + if (r > 0) { + r -= b1; + } + return b0 - r; + } + + /** SQL <code>CEIL</code> operator applied to long values. */ + public static long ceil(long b0, long b1) { + return floor(b0 + b1 - 1, b1); + } + public static DecimalData ceil(DecimalData a) { return DecimalDataUtils.ceil(a); } @@ -1103,6 +1187,24 @@ public class SqlFunctionUtils { return UUID.nameUUIDFromBytes(b).toString(); } + /** SQL <code>TRUNCATE</code> operator applied to int values. */ + public static int struncate(int b0) { + return struncate(b0, 0); + } + + public static int struncate(int b0, int b1) { + return (int) struncate((long) b0, b1); + } + + /** SQL <code>TRUNCATE</code> operator applied to long values. */ + public static long struncate(long b0) { + return struncate(b0, 0); + } + + public static long struncate(long b0, int b1) { + return castToIntegral(struncate(castFrom(b0, 38, 18), b1)); + } + /** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */ public static DecimalData struncate(DecimalData b0) { return struncate(b0, 0); @@ -1137,6 +1239,15 @@ public class SqlFunctionUtils { return (float) doubleValue(struncate(castFrom((double) b0, 38, 18), b1)); } + /** SQL <code>TRUNCATE</code> operator applied to double values. */ + public static double struncate(double b0) { + return struncate(b0, 0); + } + + public static double struncate(double b0, int b1) { + return doubleValue(struncate(castFrom(b0, 38, 18), b1)); + } + /** * Compares two byte arrays in lexicographical order. * @@ -1160,4 +1271,41 @@ public class SqlFunctionUtils { } return array1.length - array2.length; } + + /** SQL INITCAP(string) function. */ + public static String initcap(String s) { + // Assumes Alpha as [A-Za-z0-9] + // white space is treated as everything else. + final int len = s.length(); + boolean start = true; + final StringBuilder newS = new StringBuilder(); + + for (int i = 0; i < len; i++) { + char curCh = s.charAt(i); + final int c = (int) curCh; + if (start) { // curCh is whitespace or first character of word. + if (c > 47 && c < 58) { // 0-9 + start = false; + } else if (c > 64 && c < 91) { // A-Z + start = false; + } else if (c > 96 && c < 123) { // a-z + start = false; + curCh = (char) (c - 32); // Uppercase this character + } + // else {} whitespace + } else { // Inside of a word or white space after end of word. + if (c > 47 && c < 58) { // 0-9 + // noop + } else if (c > 64 && c < 91) { // A-Z + curCh = (char) (c + 32); // Lowercase this character + } else if (c > 96 && c < 123) { // a-z + // noop + } else { // whitespace + start = true; + } + } + newS.append(curCh); + } // for each character in s + return newS.toString(); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java index 40641ee..7602a19 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java @@ -19,8 +19,14 @@ package org.apache.flink.table.runtime.functions; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.JsonExistsOnError; +import org.apache.flink.table.api.JsonQueryOnEmptyOrError; +import org.apache.flink.table.api.JsonQueryWrapper; +import org.apache.flink.table.api.JsonValueOnEmptyOrError; import org.apache.flink.table.api.TableException; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -30,6 +36,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Arra import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; +import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; +import com.jayway.jsonpath.spi.mapper.MappingProvider; + +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * Utilities for JSON functions. * @@ -42,6 +65,14 @@ public class SqlJsonUtils { private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY) .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + private static final Pattern JSON_PATH_BASE = + Pattern.compile( + "^\\s*(?<mode>strict|lax)\\s+(?<spec>.+)$", + Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + private static final JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new JacksonJsonProvider(); + private static final MappingProvider JSON_PATH_MAPPING_PROVIDER = new JacksonMappingProvider(); + + private SqlJsonUtils() {} /** Returns the {@link JsonNodeFactory} for creating nodes. */ public static JsonNodeFactory getNodeFactory() { @@ -71,5 +102,489 @@ public class SqlJsonUtils { } } - private SqlJsonUtils() {} + public static Boolean jsonExists(String input, String pathSpec) { + return jsonExists(jsonApiCommonSyntax(input, pathSpec), JsonExistsOnError.FALSE); + } + + public static Boolean jsonExists( + String input, String pathSpec, JsonExistsOnError errorBehavior) { + return jsonExists(jsonApiCommonSyntax(input, pathSpec), errorBehavior); + } + + private static Boolean jsonExists(JsonPathContext context, JsonExistsOnError errorBehavior) { + if (context.hasException()) { + switch (errorBehavior) { + case TRUE: + return Boolean.TRUE; + case FALSE: + return Boolean.FALSE; + case ERROR: + throw toUnchecked(context.exc); + case UNKNOWN: + return null; + default: + throw illegalErrorBehaviorInJsonExistsFunc(errorBehavior.toString()); + } + } else { + return context.obj != null; + } + } + + public static Object jsonValue( + String input, + String pathSpec, + JsonValueOnEmptyOrError emptyBehavior, + Object defaultValueOnEmpty, + JsonValueOnEmptyOrError errorBehavior, + Object defaultValueOnError) { + return jsonValue( + jsonApiCommonSyntax(input, pathSpec), + emptyBehavior, + defaultValueOnEmpty, + errorBehavior, + defaultValueOnError); + } + + private static Object jsonValue( + JsonPathContext context, + JsonValueOnEmptyOrError emptyBehavior, + Object defaultValueOnEmpty, + JsonValueOnEmptyOrError errorBehavior, + Object defaultValueOnError) { + final Exception exc; + if (context.hasException()) { + exc = context.exc; + } else { + Object value = context.obj; + if (value == null || context.mode == PathMode.LAX && !isScalarObject(value)) { + switch (emptyBehavior) { + case ERROR: + throw emptyResultOfJsonValueFuncNotAllowed(); + case NULL: + return null; + case DEFAULT: + return defaultValueOnEmpty; + default: + throw illegalEmptyBehaviorInJsonValueFunc(emptyBehavior.toString()); + } + } else if (context.mode == PathMode.STRICT && !isScalarObject(value)) { + exc = scalarValueRequiredInStrictModeOfJsonValueFunc(value.toString()); + } else { + return value; + } + } + switch (errorBehavior) { + case ERROR: + throw toUnchecked(exc); + case NULL: + return null; + case DEFAULT: + return defaultValueOnError; + default: + throw illegalErrorBehaviorInJsonValueFunc(errorBehavior.toString()); + } + } + + public static String jsonQuery( + String input, + String pathSpec, + JsonQueryWrapper wrapperBehavior, + JsonQueryOnEmptyOrError emptyBehavior, + JsonQueryOnEmptyOrError errorBehavior) { + return jsonQuery( + jsonApiCommonSyntax(input, pathSpec), + wrapperBehavior, + emptyBehavior, + errorBehavior); + } + + private static String jsonQuery( + JsonPathContext context, + JsonQueryWrapper wrapperBehavior, + JsonQueryOnEmptyOrError emptyBehavior, + JsonQueryOnEmptyOrError errorBehavior) { + final Exception exc; + if (context.hasException()) { + exc = context.exc; + } else { + Object value; + if (context.obj == null) { + value = null; + } else { + switch (wrapperBehavior) { + case WITHOUT_ARRAY: + value = context.obj; + break; + case UNCONDITIONAL_ARRAY: + value = Collections.singletonList(context.obj); + break; + case CONDITIONAL_ARRAY: + if (context.obj instanceof Collection) { + value = context.obj; + } else { + value = Collections.singletonList(context.obj); + } + break; + default: + throw illegalWrapperBehaviorInJsonQueryFunc(wrapperBehavior.toString()); + } + } + if (value == null || context.mode == PathMode.LAX && isScalarObject(value)) { + switch (emptyBehavior) { + case ERROR: + throw emptyResultOfJsonQueryFuncNotAllowed(); + case NULL: + return null; + case EMPTY_ARRAY: + return "[]"; + case EMPTY_OBJECT: + return "{}"; + default: + throw illegalEmptyBehaviorInJsonQueryFunc(emptyBehavior.toString()); + } + } else if (context.mode == PathMode.STRICT && isScalarObject(value)) { + exc = arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc(value.toString()); + } else { + try { + return jsonize(value); + } catch (Exception e) { + exc = e; + } + } + } + switch (errorBehavior) { + case ERROR: + throw toUnchecked(exc); + case NULL: + return null; + case EMPTY_ARRAY: + return "[]"; + case EMPTY_OBJECT: + return "{}"; + default: + throw illegalErrorBehaviorInJsonQueryFunc(errorBehavior.toString()); + } + } + + public static boolean isJsonValue(String input) { + try { + dejsonize(input); + return true; + } catch (Exception e) { + return false; + } + } + + public static boolean isJsonObject(String input) { + try { + Object o = dejsonize(input); + return o instanceof Map; + } catch (Exception e) { + return false; + } + } + + public static boolean isJsonArray(String input) { + try { + Object o = dejsonize(input); + return o instanceof Collection; + } catch (Exception e) { + return false; + } + } + + public static boolean isJsonScalar(String input) { + try { + Object o = dejsonize(input); + return !(o instanceof Map) && !(o instanceof Collection); + } catch (Exception e) { + return false; + } + } + + private static boolean isScalarObject(Object obj) { + if (obj instanceof Collection) { + return false; + } + if (obj instanceof Map) { + return false; + } + return true; + } + + private static String jsonize(Object input) { + return JSON_PATH_JSON_PROVIDER.toJson(input); + } + + private static Object dejsonize(String input) { + return JSON_PATH_JSON_PROVIDER.parse(input); + } + + private static JsonValueContext jsonValueExpression(String input) { + try { + return JsonValueContext.withJavaObj(dejsonize(input)); + } catch (Exception e) { + return JsonValueContext.withException(e); + } + } + + private static JsonPathContext jsonApiCommonSyntax(String input, String pathSpec) { + return jsonApiCommonSyntax(jsonValueExpression(input), pathSpec); + } + + private static JsonPathContext jsonApiCommonSyntax(JsonValueContext input, String pathSpec) { + PathMode mode; + String pathStr; + try { + Matcher matcher = JSON_PATH_BASE.matcher(pathSpec); + if (!matcher.matches()) { + mode = PathMode.STRICT; + pathStr = pathSpec; + } else { + mode = PathMode.valueOf(matcher.group(1).toUpperCase(Locale.ROOT)); + pathStr = matcher.group(2); + } + DocumentContext ctx; + switch (mode) { + case STRICT: + if (input.hasException()) { + return JsonPathContext.withStrictException(pathSpec, input.exc); + } + ctx = + JsonPath.parse( + input.obj, + Configuration.builder() + .jsonProvider(JSON_PATH_JSON_PROVIDER) + .mappingProvider(JSON_PATH_MAPPING_PROVIDER) + .build()); + break; + case LAX: + if (input.hasException()) { + return JsonPathContext.withJavaObj(PathMode.LAX, null); + } + ctx = + JsonPath.parse( + input.obj, + Configuration.builder() + .options(Option.SUPPRESS_EXCEPTIONS) + .jsonProvider(JSON_PATH_JSON_PROVIDER) + .mappingProvider(JSON_PATH_MAPPING_PROVIDER) + .build()); + break; + default: + throw illegalJsonPathModeInPathSpec(mode.toString(), pathSpec); + } + try { + return JsonPathContext.withJavaObj(mode, ctx.read(pathStr)); + } catch (Exception e) { + return JsonPathContext.withStrictException(pathSpec, e); + } + } catch (Exception e) { + return JsonPathContext.withUnknownException(e); + } + } + + private static RuntimeException toUnchecked(Exception e) { + if (e instanceof RuntimeException) { + return (RuntimeException) e; + } + return new RuntimeException(e); + } + + private static RuntimeException illegalJsonPathModeInPathSpec( + String pathMode, String pathSpec) { + return new FlinkRuntimeException( + String.format( + "Illegal jsonpath mode ''%s'' in jsonpath spec: ''%s''", + pathMode, pathSpec)); + } + + private static RuntimeException illegalJsonPathMode(String pathMode) { + return new FlinkRuntimeException(String.format("Illegal jsonpath mode ''%s''", pathMode)); + } + + private static RuntimeException illegalJsonPathSpec(String pathSpec) { + return new FlinkRuntimeException( + String.format( + "Illegal jsonpath spec ''%s'', format of the spec should be: ''<lax|strict> $'{'expr'}'''", + pathSpec)); + } + + private static RuntimeException strictPathModeRequiresNonEmptyValue() { + return new FlinkRuntimeException( + "Strict jsonpath mode requires a non empty returned value, but is null"); + } + + private static RuntimeException illegalErrorBehaviorInJsonExistsFunc(String errorBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal error behavior ''{0}'' specified in JSON_EXISTS function", + errorBehavior)); + } + + private static RuntimeException emptyResultOfJsonValueFuncNotAllowed() { + return new FlinkRuntimeException("Empty result of JSON_VALUE function is not allowed"); + } + + private static RuntimeException illegalEmptyBehaviorInJsonValueFunc(String emptyBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal empty behavior ''{0}'' specified in JSON_VALUE function", + emptyBehavior)); + } + + private static RuntimeException illegalErrorBehaviorInJsonValueFunc(String errorBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal error behavior ''%s'' specified in JSON_VALUE function", + errorBehavior)); + } + + private static RuntimeException scalarValueRequiredInStrictModeOfJsonValueFunc(String value) { + return new FlinkRuntimeException( + String.format( + "Strict jsonpath mode requires scalar value, and the actual value is: ''%s''", + value)); + } + + private static RuntimeException illegalWrapperBehaviorInJsonQueryFunc(String wrapperBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal wrapper behavior ''%s'' specified in JSON_QUERY function", + wrapperBehavior)); + } + + private static RuntimeException emptyResultOfJsonQueryFuncNotAllowed() { + return new FlinkRuntimeException("Empty result of JSON_QUERY function is not allowed"); + } + + private static RuntimeException illegalEmptyBehaviorInJsonQueryFunc(String emptyBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal empty behavior ''%s'' specified in JSON_VALUE function", + emptyBehavior)); + } + + private static RuntimeException arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc( + String value) { + return new FlinkRuntimeException( + String.format( + "Strict jsonpath mode requires array or object value, and the actual value is: ''%s''", + value)); + } + + private static RuntimeException illegalErrorBehaviorInJsonQueryFunc(String errorBehavior) { + return new FlinkRuntimeException( + String.format( + "Illegal error behavior ''%s'' specified in JSON_VALUE function", + errorBehavior)); + } + + /** + * Path spec has two different modes: lax mode and strict mode. Lax mode suppresses any thrown + * exception and returns null, whereas strict mode throws exceptions. + */ + public enum PathMode { + LAX, + STRICT, + UNKNOWN, + NONE + } + + /** Returned path context of JsonApiCommonSyntax, public for testing. */ + private static class JsonPathContext { + public final PathMode mode; + public final Object obj; + public final Exception exc; + + private JsonPathContext(Object obj, Exception exc) { + this(PathMode.NONE, obj, exc); + } + + private JsonPathContext(PathMode mode, Object obj, Exception exc) { + assert obj == null || exc == null; + this.mode = mode; + this.obj = obj; + this.exc = exc; + } + + public boolean hasException() { + return exc != null; + } + + public static JsonPathContext withUnknownException(Exception exc) { + return new JsonPathContext(PathMode.UNKNOWN, null, exc); + } + + public static JsonPathContext withStrictException(Exception exc) { + return new JsonPathContext(PathMode.STRICT, null, exc); + } + + public static JsonPathContext withStrictException(String pathSpec, Exception exc) { + if (exc.getClass() == InvalidPathException.class) { + exc = illegalJsonPathSpec(pathSpec); + } + return withStrictException(exc); + } + + public static JsonPathContext withJavaObj(PathMode mode, Object obj) { + if (mode == PathMode.UNKNOWN) { + throw illegalJsonPathMode(mode.toString()); + } + if (mode == PathMode.STRICT && obj == null) { + throw strictPathModeRequiresNonEmptyValue(); + } + return new JsonPathContext(mode, obj, null); + } + + @Override + public String toString() { + return "JsonPathContext{" + "mode=" + mode + ", obj=" + obj + ", exc=" + exc + '}'; + } + } + + private static class JsonValueContext { + @JsonValue public final Object obj; + public final Exception exc; + + private JsonValueContext(Object obj, Exception exc) { + assert obj == null || exc == null; + this.obj = obj; + this.exc = exc; + } + + public static JsonValueContext withJavaObj(Object obj) { + return new JsonValueContext(obj, null); + } + + public static JsonValueContext withException(Exception exc) { + return new JsonValueContext(null, exc); + } + + public boolean hasException() { + return exc != null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonValueContext jsonValueContext = (JsonValueContext) o; + return Objects.equals(obj, jsonValueContext.obj); + } + + @Override + public int hashCode() { + return Objects.hash(obj); + } + + @Override + public String toString() { + return Objects.toString(obj); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java new file mode 100644 index 0000000..583b3c2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.generated; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to create {@link + * org.apache.flink.api.common.eventtime.WatermarkGenerator}. The {@link + * GeneratedWatermarkGeneratorSupplier} uses the {@link Context} to init the generated watermark + * generator. + */ +@Internal +public class GeneratedWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> { + private static final long serialVersionUID = 1L; + + private final Configuration configuration; + private final GeneratedWatermarkGenerator generatedWatermarkGenerator; + + public GeneratedWatermarkGeneratorSupplier( + Configuration configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) { + this.configuration = configuration; + this.generatedWatermarkGenerator = generatedWatermarkGenerator; + } + + @Override + public org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> + createWatermarkGenerator(Context context) { + + List<Object> references = + new ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences())); + references.add(context); + + WatermarkGenerator innerWatermarkGenerator = + new GeneratedWatermarkGenerator( + generatedWatermarkGenerator.getClassName(), + generatedWatermarkGenerator.getCode(), + references.toArray(), + configuration) + .newInstance(Thread.currentThread().getContextClassLoader()); + + try { + innerWatermarkGenerator.open(configuration); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate generated watermark generator.", e); + } + return new GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator( + innerWatermarkGenerator); + } + + /** Wrapper of the code-generated {@link WatermarkGenerator}. */ + public static class DefaultWatermarkGenerator + implements org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> { + private static final long serialVersionUID = 1L; + + private final WatermarkGenerator innerWatermarkGenerator; + private Long currentWatermark = Long.MIN_VALUE; + + public DefaultWatermarkGenerator(WatermarkGenerator watermarkGenerator) { + this.innerWatermarkGenerator = watermarkGenerator; + } + + @Override + public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) { + try { + Long watermark = innerWatermarkGenerator.currentWatermark(event); + if (watermark != null) { + currentWatermark = watermark; + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Generated WatermarkGenerator fails to generate for row: %s.", + event), + e); + } + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + output.emitWatermark(new Watermark(currentWatermark)); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java new file mode 100644 index 0000000..a379b2b --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.interval; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +/** Function filtering out all the input records. */ +@Internal +public class FilterAllFlatMapFunction + implements FlatMapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { + private static final long serialVersionUID = 1L; + + private final InternalTypeInfo<RowData> outputTypeInfo; + + public FilterAllFlatMapFunction(InternalTypeInfo<RowData> inputTypeInfo) { + this.outputTypeInfo = inputTypeInfo; + } + + @Override + public void flatMap(RowData value, Collector<RowData> out) {} + + @Override + public TypeInformation<RowData> getProducedType() { + return outputTypeInfo; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java new file mode 100644 index 0000000..ecd84cb --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.interval; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +/** Function performing left padding. */ +@Internal +public class PaddingLeftMapFunction + implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { + private static final long serialVersionUID = 1L; + + private final OuterJoinPaddingUtil paddingUtil; + private final InternalTypeInfo<RowData> outputTypeInfo; + + public PaddingLeftMapFunction( + OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) { + this.paddingUtil = paddingUtil; + this.outputTypeInfo = returnType; + } + + @Override + public RowData map(RowData value) { + return paddingUtil.padLeft(value); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return outputTypeInfo; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java new file mode 100644 index 0000000..5cb02dd --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.interval; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +/** Function performing right padding. */ +@Internal +public class PaddingRightMapFunction + implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> { + private static final long serialVersionUID = 1L; + + private final OuterJoinPaddingUtil paddingUtil; + private final InternalTypeInfo<RowData> outputTypeInfo; + + public PaddingRightMapFunction( + OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> returnType) { + this.paddingUtil = paddingUtil; + this.outputTypeInfo = returnType; + } + + @Override + public RowData map(RowData value) { + return paddingUtil.padRight(value); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return outputTypeInfo; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java new file mode 100644 index 0000000..3ed9556 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.wmassigners; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner; + +import javax.annotation.Nullable; + +/** Generates periodic watermarks based on a {@link PeriodicWatermarkAssigner}. */ +@Internal +public class PeriodicWatermarkAssignerWrapper implements AssignerWithPeriodicWatermarks<RowData> { + private static final long serialVersionUID = 1L; + private final PeriodicWatermarkAssigner assigner; + private final int timeFieldIdx; + + /** + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ + public PeriodicWatermarkAssignerWrapper(PeriodicWatermarkAssigner assigner, int timeFieldIdx) { + this.assigner = assigner; + this.timeFieldIdx = timeFieldIdx; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return assigner.getWatermark(); + } + + @Override + public long extractTimestamp(RowData row, long recordTimestamp) { + long timestamp = row.getTimestamp(timeFieldIdx, 3).getMillisecond(); + assigner.nextTimestamp(timestamp); + return 0; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java new file mode 100644 index 0000000..a3f99e2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.wmassigners; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.types.Row; + +import javax.annotation.Nullable; + +/** Generates periodic watermarks based on a {@link PunctuatedWatermarkAssigner}. */ +@Internal +public class PunctuatedWatermarkAssignerWrapper + implements AssignerWithPunctuatedWatermarks<RowData> { + private static final long serialVersionUID = 1L; + private final PunctuatedWatermarkAssigner assigner; + private final int timeFieldIdx; + private final DataFormatConverters.DataFormatConverter<RowData, Row> converter; + + /** + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + * @param sourceType the type of source + */ + @SuppressWarnings("unchecked") + public PunctuatedWatermarkAssignerWrapper( + PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType sourceType) { + this.assigner = assigner; + this.timeFieldIdx = timeFieldIdx; + DataType originDataType; + if (sourceType instanceof FieldsDataType) { + originDataType = sourceType; + } else { + originDataType = DataTypes.ROW(DataTypes.FIELD("f0", sourceType)); + } + converter = + DataFormatConverters.getConverterForDataType(originDataType.bridgedTo(Row.class)); + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(RowData row, long extractedTimestamp) { + long timestamp = row.getLong(timeFieldIdx); + return assigner.getWatermark(converter.toExternal(row), timestamp); + } + + @Override + public long extractTimestamp(RowData element, long recordTimestamp) { + return 0; + } +} diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..33fe52a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE @@ -0,0 +1,9 @@ +flink-table-runtime +Copyright 2014-2021 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.jayway.jsonpath:json-path:2.6.0 diff --git a/flink-table/pom.xml b/flink-table/pom.xml index 63f6371..850931e 100644 --- a/flink-table/pom.xml +++ b/flink-table/pom.xml @@ -72,9 +72,10 @@ under the License. </dependencyManagement> <properties> - <!-- When updating Janino, make sure that Calcite supports it as well. --> - <janino.version>3.0.11</janino.version> <calcite.version>1.26.0</calcite.version> + <!-- Keep Janino in sync with calcite. --> + <janino.version>3.0.11</janino.version> + <jsonpath.version>2.6.0</jsonpath.version> <guava.version>29.0-jre</guava.version> </properties> </project>