I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with a 
nested field and a watermark and filled with hard-coded values, which is all I 
need in order to test SQL expressions.

It's quite a mouthful though, is there a more succint to express the same thing?


var testData = List.*of*(
    Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")),
    Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")),
    Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20"))
);
var testStream = streamEnv
    .fromCollection(testData,
        Types.*ROW_NAMED*(new String[] {"created", "event_time"},
            Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*),
            Types.*SQL_TIMESTAMP
        *)
    )
    .assignTimestampsAndWatermarks(WatermarkStrategy
        .<Row>*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10))
        .withTimestampAssigner(
            TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp) 
(t2.getField(1))).getTime()))
    );
var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"), 
*$*("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);




On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> I'm trying to write java unit test for a Flink SQL application using Flink 
> mini cluster, but I do not manage to create an input table with nested fields 
> and time characteristics.
> 
> I had a look at the documentation and examples below, although I'm still 
> struggling:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> 
> 
> Consider for example this simple expression that I want to test and which 
> depends on the nested field "created.group_id" and expects "metricValue" to 
> be the row time:
> 
> 
> var createTableDDl = ""
>         + " CREATE TEMPORARY VIEW postCreated10min                            
>           \n"
>         + " AS                                                                
>           \n"
>         + " SELECT                                                            
>           \n"
>         + "   created.group_id as groupId,                                    
>           \n"
>         + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,    
>           \n"
>         + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,   
>           \n"
>         + "   count(1) as metricValue                                         
>           \n"
>         + " FROM post_events_kafka                                            
>           \n"
>         + " GROUP BY                                                          
>           \n"
>         + "   created.group_id,                                               
>           \n"
>         + "   TUMBLE(event_time, INTERVAL '10' MINUTES)                       
>           \n";
> tableEnv.executeSql(createTableDDl);
> 
> 
> In a unit test, the following syntax allows me to create test input data with 
> nested fields, but I have not found how to specify row time nor watermarks 
> with this approach:
> 
> 
> Table testTable = tableEnv.fromValues(
>   DataTypes.ROW(
>     DataTypes.FIELD("created",
>       DataTypes.ROW(
>         DataTypes.FIELD("group_id", DataTypes.STRING())
>       )
>     ),
>     DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
>   ),
>  
>   row(row("group123"), "2021-02-03 11:36:20"),
>   row(row("group123"), "2021-02-03 11:38:20"),
>   row(row("group123"), "2021-02-03 11:40:20")
> );
> tableEnv.createTemporaryView("post_events_kafka", testTable);
> 
> 
> I have also tried the following syntax, which allows to specify watermark and 
> row time, but I have not found how to create a nested field with this 
> approach:
> 
> 
> var testData = List.of(
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
>   .fromCollection(testData)
>   .assignTimestampsAndWatermarks(WatermarkStrategy
>   .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
>   .withTimestampAssigner(
>     TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
>   );
> var testDataTable = tableEnv.fromDataStream(
>   testStream,
>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
> );
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> 
> 
> 
> What am I missing?

Reply via email to