Hi Henri,

I just noticed that I had a tiny mistake in my little test program. So SELECT DISTINCT is officially supported. But the question if this is a valid append stream is still up for discussion. I will loop in Fabian (in CC).

For the general behavior you can also look into the code and especially the comments there [1].

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala


Am 2/6/18 um 1:36 PM schrieb Timo Walther:
Hi Henri,

I try to answer your question:

1) You are right, SELECT DISTINCT should not need a retract stream. Internally, this is translated into an aggregation without an aggregate function call. So this definitely needs improvement.

2) The problem is that SELECT DISTINCT is not officially supported nor tested. I opened an issue for this [1].

Until this issue is fixed I would recommend to implement a custom aggregate function that keeps track values seen so far [2].

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-8564
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions


Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
Hi,

I have a use case where I would like to find distinct rows over certain period of time. Requirement is that new row is emitted asap. Otherwise the requirement is mainly to just filter out data to have smaller dataset for downstream. I noticed that SELECT DISTINCT and state retention time of 12 hours would in theory do the trick. You can find the code below. Few questions.

1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we would get update/delete rows?

2) If I run the below code with the example data (also below) without state retention config I get the two append rows (expected). If I run exactly the code below (with the retention config) I'll get two appends and one delete for AN1234 and then one append for AN5555. What is going on?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", "advertiser_id", "platform_id"}, new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()});

result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891



Reply via email to