Thanks for the feedback. 

The CSV is a good idea and will make my tests more readable, I'll use that. 

Looking forward to Flink 1.13 !

Svend 




On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
> Hi,
> 
> there are multiple ways to create a table for testing:
> 
> - use the datagen connector
> - use the filesystem connector with CSV data
> - and beginning from Flink 1.13 your code snippets becomes much simpler
> 
> Regards,
> Timo
> 
> On 29.04.21 20:35, Svend wrote:
> > I found an answer to my own question!
> > 
> > For future reference, the snipet below allows to create a SQL table with 
> > a nested field and a watermark and filled with hard-coded values, which 
> > is all I need in order to test SQL expressions.
> > 
> > It's quite a mouthful though, is there a more succint to express the 
> > same thing?
> > 
> > 
> > var testData = List./of/(
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> > );
> > var testStream = streamEnv
> > .fromCollection(testData,
> > Types./ROW_NAMED/(new String[] {"created", "event_time"},
> > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> > Types./SQL_TIMESTAMP
> > /)
> > )
> > .assignTimestampsAndWatermarks(WatermarkStrategy
> > .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> > .withTimestampAssigner(
> > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) 
> > (t2.getField(1))).getTime()))
> > );
> > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), 
> > /$/("event_time").rowtime());
> > tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> > 
> > 
> > 
> > 
> > 
> > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> >> I'm trying to write java unit test for a Flink SQL application using 
> >> Flink mini cluster, but I do not manage to create an input table with 
> >> nested fields and time characteristics.
> >>
> >> I had a look at the documentation and examples below, although I'm 
> >> still struggling:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> >>  
> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html>
> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> >>  
> >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>
> >>
> >>
> >> Consider for example this simple expression that I want to test and 
> >> which depends on the nested field "created.group_id" and expects 
> >> "metricValue" to be the row time:
> >>
> >>
> >> var createTableDDl = ""
> >>         + " CREATE TEMPORARY VIEW 
> >> postCreated10min                                      \n"
> >>         + " 
> >> AS                                                                         
> >>  
> >> \n"
> >>         + " 
> >> SELECT                                                                     
> >>  
> >> \n"
> >>         + "   created.group_id as 
> >> groupId,                                              \n"
> >>         + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as 
> >> metricTime,              \n"
> >>         + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as 
> >> rowTime,             \n"
> >>         + "   count(1) as 
> >> metricValue                                                   \n"
> >>         + " FROM 
> >> post_events_kafka                                                      \n"
> >>         + " GROUP 
> >> BY                                                                    \n"
> >>         + "   
> >> created.group_id,                                                         
> >> \n"
> >>         + "   TUMBLE(event_time, INTERVAL '10' 
> >> MINUTES)                                 \n";
> >> tableEnv.executeSql(createTableDDl);
> >>
> >>
> >> In a unit test, the following syntax allows me to create test input 
> >> data with nested fields, but I have not found how to specify row time 
> >> nor watermarks with this approach:
> >>
> >>
> >> Table testTable = tableEnv.fromValues(
> >>   DataTypes.ROW(
> >>     DataTypes.FIELD("created",
> >>       DataTypes.ROW(
> >>         DataTypes.FIELD("group_id", DataTypes.STRING())
> >>       )
> >>     ),
> >>     DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
> >>   ),
> >>
> >>   row(row("group123"), "2021-02-03 11:36:20"),
> >>   row(row("group123"), "2021-02-03 11:38:20"),
> >>   row(row("group123"), "2021-02-03 11:40:20")
> >> );
> >> tableEnv.createTemporaryView("post_events_kafka", testTable);
> >>
> >>
> >> I have also tried the following syntax, which allows to specify 
> >> watermark and row time, but I have not found how to create a nested 
> >> field with this approach:
> >>
> >>
> >> var testData = List.of(
> >>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
> >>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
> >>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> >> );
> >> var testStream = streamEnv
> >>   .fromCollection(testData)
> >>   .assignTimestampsAndWatermarks(WatermarkStrategy
> >>   .<Tuple2<String, 
> >> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
> >>   .withTimestampAssigner(
> >>     TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
> >>   );
> >> var testDataTable = tableEnv.fromDataStream(
> >>   testStream,
> >>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
> >> );
> >> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> >>
> >>
> >>
> >> What am I missing?
> > 
> 
> 

Reply via email to