Hi,
there are multiple ways to create a table for testing:
- use the datagen connector
- use the filesystem connector with CSV data
- and beginning from Flink 1.13 your code snippets becomes much simpler
Regards,
Timo
On 29.04.21 20:35, Svend wrote:
I found an answer to my own question!
For future reference, the snipet below allows to create a SQL table with
a nested field and a watermark and filled with hard-coded values, which
is all I need in order to test SQL expressions.
It's quite a mouthful though, is there a more succint to express the
same thing?
var testData = List./of/(
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types./ROW_NAMED/(new String[] {"created", "event_time"},
Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
Types./SQL_TIMESTAMP
/)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
.withTimestampAssigner(
TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp)
(t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"),
/$/("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);
On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
I'm trying to write java unit test for a Flink SQL application using
Flink mini cluster, but I do not manage to create an input table with
nested fields and time characteristics.
I had a look at the documentation and examples below, although I'm
still struggling:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html>
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>
Consider for example this simple expression that I want to test and
which depends on the nested field "created.group_id" and expects
"metricValue" to be the row time:
var createTableDDl = ""
+ " CREATE TEMPORARY VIEW
postCreated10min \n"
+ "
AS
\n"
+ "
SELECT
\n"
+ " created.group_id as
groupId, \n"
+ " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as
metricTime, \n"
+ " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as
rowTime, \n"
+ " count(1) as
metricValue \n"
+ " FROM
post_events_kafka \n"
+ " GROUP
BY \n"
+ "
created.group_id,
\n"
+ " TUMBLE(event_time, INTERVAL '10'
MINUTES) \n";
tableEnv.executeSql(createTableDDl);
In a unit test, the following syntax allows me to create test input
data with nested fields, but I have not found how to specify row time
nor watermarks with this approach:
Table testTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("created",
DataTypes.ROW(
DataTypes.FIELD("group_id", DataTypes.STRING())
)
),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
),
row(row("group123"), "2021-02-03 11:36:20"),
row(row("group123"), "2021-02-03 11:38:20"),
row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);
I have also tried the following syntax, which allows to specify
watermark and row time, but I have not found how to create a nested
field with this approach:
var testData = List.of(
Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String,
Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner(
TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
);
var testDataTable = tableEnv.fromDataStream(
testStream,
$("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);
What am I missing?