I actually have already specified the data type int, but that doesn't work:
public void accumulate(LastDecimalAccumulator accumulator,
> @DataTypeHint("DECIMAL(38, 18)") BigDecimal value)
> {
> if (value != null) {
> accumulator.f0 = value;
> }
> }
>
I did some experiments, and the result is:
*1. The accumulator has also to be specified by a type hint (I think the
documents didn't say that)*
*2. FunctionHint did the work, DataTypeHint did not.*
This is the code snippet:
@FunctionHint(
> input = {@DataTypeHint("DECIMAL(38,18)")},
> accumulator = @DataTypeHint(value = "RAW", bridgedTo =
> LastDecimalAccumulator.class),
> output = @DataTypeHint("DECIMAL(38,18)")
> )
> public static class LastDecimalAggFunction extends
> AggregateFunction<BigDecimal, LastDecimalAccumulator> {
>
> @Override
> public
> // @DataTypeHint("DECIMAL(38,18)")
> BigDecimal getValue(LastDecimalAccumulator accumulator) {
> return accumulator.f0;
> }
>
> @Override
> public
> // @DataTypeHint(value = "RAW", bridgedTo =
> LastDecimalAccumulator.class)
> LastDecimalAccumulator createAccumulator() {
> return new LastDecimalAccumulator(null);
> }
>
> public void accumulate(
> // @DataTypeHint(value = "RAW", bridgedTo =
> LastDecimalAccumulator.class)
> LastDecimalAccumulator accumulator,
> // @DataTypeHint("DECIMAL(38,18)")
> BigDecimal value) {
> if (value != null) {
> accumulator.f0 = value;
> }
> }
>
*The whole program:*
package poc.flink.table;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import static org.apache.flink.table.api.Expressions.$;
public class PocLastDecimalJob {
private static final Logger LOGGER =
LoggerFactory.getLogger(PocLastDecimalJob.class);
public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
public LastDecimalAccumulator() {
f0 = null;
}
public LastDecimalAccumulator(BigDecimal f0) {
super(f0);
}
}
@FunctionHint(
input = {@DataTypeHint("DECIMAL(38,18)")},
accumulator = @DataTypeHint(value = "RAW", bridgedTo =
LastDecimalAccumulator.class),
output = @DataTypeHint("DECIMAL(38,18)")
)
public static class LastDecimalAggFunction extends
AggregateFunction<BigDecimal, LastDecimalAccumulator> {
@Override
public
// @DataTypeHint("DECIMAL(38,18)")
BigDecimal getValue(LastDecimalAccumulator accumulator) {
return accumulator.f0;
}
@Override
public
// @DataTypeHint(value = "RAW", bridgedTo =
LastDecimalAccumulator.class)
LastDecimalAccumulator createAccumulator() {
return new LastDecimalAccumulator(null);
}
public void accumulate(
// @DataTypeHint(value = "RAW", bridgedTo =
LastDecimalAccumulator.class)
LastDecimalAccumulator accumulator,
// @DataTypeHint("DECIMAL(38,18)")
BigDecimal value) {
if (value != null) {
accumulator.f0 = value;
}
}
public void merge(LastDecimalAccumulator accumulator,
Iterable<LastDecimalAccumulator> iterable) {
if (iterable != null) {
for (LastDecimalAccumulator item : iterable) {
accumulator.f0 = item.f0;
}
}
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT,
Types.BIG_DEC},
new String[] {"ts", "id", "val"});
DataStream<Row> dataStream = env.fromElements(
Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
).assignTimestampsAndWatermarks(WatermarkStrategy
.<Row>forMonotonousTimestamps()
.withTimestampAssigner(((element, recordTimestamp) ->
((Timestamp) element.getField(0)).getTime()))
).returns(rowTypeInfo);
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new
LastDecimalAggFunction());
tableEnv.createTemporaryView("InputTable", dataStream,
$("ts").rowtime(), $("id"), $("val"));
Table resultTable = tableEnv.sqlQuery("" +
"SELECT id, LAST_DECIMAL(val) " +
"FROM InputTable " +
"GROUP BY HOP(ts, INTERVAL '1' SECOND, INTERVAL '1'
SECOND), id");
DataStream<Row> resultStream = tableEnv
.toRetractStream(resultTable, new RowTypeInfo(Types.INT,
Types.BIG_DEC))
.map((MapFunction<Tuple2<Boolean, Row>, Row>) value ->
value.f1);
resultStream.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value, Context context) {
LOGGER.info("SinkFunction.invoke(): value={}", value);
}
});
env.execute();
}
}
Ingo Bürk <[email protected]> 于2021年7月30日周五 下午1:51写道:
> Hi,
>
> for BigDecimal you need to specify a type hint to define the precision and
> scale. For example, look at [1][2] and search for BigDecimal. Can you try
> with that?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/
>
>
> Best
> Ingo
>
> On Fri, Jul 30, 2021 at 5:12 AM LIU Xiao <[email protected]> wrote:
>
>> sorry for a little error, the program code should be:
>>
>> package poc.flink.table;
>>
>> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>> import org.apache.flink.table.annotation.DataTypeHint;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.AggregateFunction;
>> import org.apache.flink.types.Row;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.math.BigDecimal;
>> import java.sql.Timestamp;
>>
>> import static org.apache.flink.table.api.Expressions.$;
>>
>> public class PocLastDecimalJob {
>> private static final Logger LOGGER =
>> LoggerFactory.getLogger(PocLastDecimalJob.class);
>>
>> public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
>> public LastDecimalAccumulator(BigDecimal f0) {
>> super(f0);
>> }
>> }
>>
>> public static class LastDecimalAggFunction extends
>> AggregateFunction<BigDecimal, LastDecimalAccumulator> {
>>
>> @Override
>> public BigDecimal getValue(LastDecimalAccumulator accumulator) {
>> return accumulator.f0;
>> }
>>
>> @Override
>> public LastDecimalAccumulator createAccumulator() {
>> return new LastDecimalAccumulator(null);
>> }
>>
>> public void accumulate(LastDecimalAccumulator accumulator,
>> @DataTypeHint("DECIMAL(38, 18)") BigDecimal
>> value) {
>> if (value != null) {
>> accumulator.f0 = value;
>> }
>> }
>>
>> public void merge(LastDecimalAccumulator accumulator,
>> Iterable<BigDecimal> iterable) {
>> if (iterable != null) {
>> for (BigDecimal item : iterable) {
>> accumulator.f0 = item;
>> }
>> }
>> }
>> }
>>
>> public static void main(String[] args) throws Exception {
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>
>> RowTypeInfo rowTypeInfo = new RowTypeInfo(
>> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT,
>> Types.BIG_DEC},
>> new String[] {"rowtime", "id", "val"});
>>
>> DataStream<Row> dataStream = env.fromElements(
>> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
>> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
>> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
>>
>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new
>> LastDecimalAggFunction());
>>
>> tableEnv.createTemporaryView("InputTable", dataStream,
>> $("rowtime").rowtime(), $("id"), $("val"));
>>
>> Table resultTable = tableEnv.sqlQuery("" +
>> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
>> "FROM InputTable " +
>> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1'
>> SECOND), id");
>>
>> DataStream<Row> resultStream = tableEnv
>> .toRetractStream(resultTable, new
>> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
>> .map((MapFunction<Tuple2<Boolean, Row>, Row>) value ->
>> value.f1);
>>
>> resultStream.addSink(new SinkFunction<Row>() {
>> @Override
>> public void invoke(Row value, Context context) {
>> LOGGER.info("SinkFunction.invoke(): value={}", value);
>> }
>> });
>>
>> env.execute();
>> }
>> }
>>
>>
>> LIU Xiao <[email protected]> 于2021年7月30日周五 上午11:04写道:
>>
>>> I'm currently converting our old code (based on Flink 1.6) to Flink 1.13
>>> and encountered a strange problem about the user-defined aggregate function
>>> which takes BigDecimal as the parameter and output:
>>>
>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: SQL validation failed. An
>>>> error occurred in the type inference logic of function 'LAST_DECIMAL'.
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>>> at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101)
>>>> Caused by: org.apache.flink.table.api.ValidationException: An error
>>>> occurred in the type inference logic of function 'LAST_DECIMAL'.
>>>> at
>>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>>>> at
>>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>>>> at
>>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>>>> at java.base/java.util.Optional.flatMap(Optional.java:294)
>>>> at
>>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>>>> at
>>>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>>>> at
>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
>>>> at
>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
>>>> at
>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
>>>> at
>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
>>>> at
>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>>> ... 5 more
>>>> Caused by: org.apache.flink.table.api.ValidationException: Could not
>>>> extract a valid type inference for function class
>>>> 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation
>>>> mistakes and/or provide a corresponding hint.
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>>> at
>>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>>>> at
>>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
>>>> at
>>>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
>>>> at
>>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>>>> ... 16 more
>>>> Caused by: org.apache.flink.table.api.ValidationException: Error in
>>>> extracting a signature to output mapping.
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>>>> at
>>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>>>> at
>>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>>>> ... 19 more
>>>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>>>> extract a type inference from method:
>>>> public void
>>>> poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal)
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>>>> ... 21 more
>>>> Caused by: org.apache.flink.table.api.ValidationException: Could not
>>>> extract a data type from 'class java.math.BigDecimal' in generic class
>>>> 'org.apache.flink.table.functions.AggregateFunction' in class
>>>> poc.flink.table.LastDecimalAggFunction. Please pass the required data type
>>>> manually or allow RAW types.
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>>>> at
>>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>>>> ... 22 more
>>>> Caused by: org.apache.flink.table.api.ValidationException: Values of
>>>> 'java.math.BigDecimal' need fixed precision and scale.
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>>>> at
>>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276)
>>>> at
>>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
>>>> ... 29 more
>>>>
>>>
>>> The program source code:
>>>
>>> package poc.flink.table;
>>>
>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.api.java.tuple.Tuple1;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>>> import org.apache.flink.table.annotation.DataTypeHint;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.functions.AggregateFunction;
>>> import org.apache.flink.types.Row;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.math.BigDecimal;
>>> import java.sql.Timestamp;
>>>
>>> import static org.apache.flink.table.api.Expressions.$;
>>>
>>> public class PocLastDecimalJob {
>>> private static final Logger LOGGER =
>>> LoggerFactory.getLogger(PocLastDecimalJob.class);
>>>
>>> public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
>>> public LastDecimalAccumulator(BigDecimal f0) {
>>> super(f0);
>>> }
>>> }
>>>
>>> public static class LastDecimalAggFunction extends
>>> AggregateFunction<BigDecimal, LastDecimalAccumulator> {
>>>
>>> @Override
>>> public BigDecimal getValue(LastDecimalAccumulator accumulator) {
>>> return accumulator.f0;
>>> }
>>>
>>> @Override
>>> public LastDecimalAccumulator createAccumulator() {
>>> return new LastDecimalAccumulator(null);
>>> }
>>>
>>> public void accumulate(LastDecimalAccumulator accumulator,
>>> @DataTypeHint("DECIMAL(38, 18)") BigDecimal
>>> value) {
>>> if (value != null) {
>>> accumulator.f0 = value;
>>> }
>>> }
>>>
>>> public void merge(LastDecimalAccumulator accumulator,
>>> Iterable<BigDecimal> iterable) {
>>> if (iterable != null) {
>>> for (BigDecimal item : iterable) {
>>> accumulator.f0 = item;
>>> }
>>> }
>>> }
>>> }
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>>
>>> RowTypeInfo rowTypeInfo = new RowTypeInfo(
>>> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT,
>>> Types.BIG_DEC},
>>> new String[] {"rowtime", "id", "val"});
>>>
>>> DataStream<Row> dataStream = env.fromElements(
>>> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
>>> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
>>> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
>>>
>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);
>>>
>>> StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(env);
>>> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new
>>> LastDecimalAggFunction());
>>>
>>> tableEnv.createTemporaryView("InputTable", dataStream,
>>> $("rowtime.rowtime"), $("id"), $("val"));
>>>
>>> Table resultTable = tableEnv.sqlQuery("" +
>>> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
>>> "FROM InputTable " +
>>> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1'
>>> SECOND), id");
>>>
>>> DataStream<Row> resultStream = tableEnv
>>> .toRetractStream(resultTable, new
>>> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
>>> .map((MapFunction<Tuple2<Boolean, Row>, Row>) value ->
>>> value.f1);
>>>
>>> resultStream.addSink(new SinkFunction<Row>() {
>>> @Override
>>> public void invoke(Row value, Context context) {
>>> LOGGER.info("SinkFunction.invoke(): value={}", value);
>>> }
>>> });
>>>
>>> env.execute();
>>> }
>>> }
>>>
>>>