Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-30 Thread Svend
Thanks for the feedback. 

The CSV is a good idea and will make my tests more readable, I'll use that. 

Looking forward to Flink 1.13 !

Svend 




On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
> Hi,
> 
> there are multiple ways to create a table for testing:
> 
> - use the datagen connector
> - use the filesystem connector with CSV data
> - and beginning from Flink 1.13 your code snippets becomes much simpler
> 
> Regards,
> Timo
> 
> On 29.04.21 20:35, Svend wrote:
> > I found an answer to my own question!
> > 
> > For future reference, the snipet below allows to create a SQL table with 
> > a nested field and a watermark and filled with hard-coded values, which 
> > is all I need in order to test SQL expressions.
> > 
> > It's quite a mouthful though, is there a more succint to express the 
> > same thing?
> > 
> > 
> > var testData = List./of/(
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> > );
> > var testStream = streamEnv
> > .fromCollection(testData,
> > Types./ROW_NAMED/(new String[] {"created", "event_time"},
> > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> > Types./SQL_TIMESTAMP
> > /)
> > )
> > .assignTimestampsAndWatermarks(WatermarkStrategy
> > ./forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> > .withTimestampAssigner(
> > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) 
> > (t2.getField(1))).getTime()))
> > );
> > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), 
> > /$/("event_time").rowtime());
> > tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> > 
> > 
> > 
> > 
> > 
> > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> >> I'm trying to write java unit test for a Flink SQL application using 
> >> Flink mini cluster, but I do not manage to create an input table with 
> >> nested fields and time characteristics.
> >>
> >> I had a look at the documentation and examples below, although I'm 
> >> still struggling:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> >>  
> >> 
> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> >>  
> >> 
> >>
> >>
> >> Consider for example this simple expression that I want to test and 
> >> which depends on the nested field "created.group_id" and expects 
> >> "metricValue" to be the row time:
> >>
> >>
> >> var createTableDDl = ""
> >> + " CREATE TEMPORARY VIEW 
> >> postCreated10min  \n"
> >> + " 
> >> AS 
> >>  
> >> \n"
> >> + " 
> >> SELECT 
> >>  
> >> \n"
> >> + "   created.group_id as 
> >> groupId,  \n"
> >> + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as 
> >> metricTime,  \n"
> >> + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as 
> >> rowTime, \n"
> >> + "   count(1) as 
> >> metricValue   \n"
> >> + " FROM 
> >> post_events_kafka  \n"
> >> + " GROUP 
> >> BY\n"
> >> + "   
> >> created.group_id, 
> >> \n"
> >> + "   TUMBLE(event_time, INTERVAL '10' 
> >> MINUTES) \n";
> >> tableEnv.executeSql(createTableDDl);
> >>
> >>
> >> In a unit test, the following syntax allows me to create test input 
> >> data with nested fields, but I have not found how to specify row time 
> >> nor watermarks with this approach:
> >>
> >>
> >> Table testTable = tableEnv.fromValues(
> >>   DataTypes.ROW(
> >> DataTypes.FIELD("created",
> >>   DataTypes.ROW(
> >> DataTypes.FIELD("group_id", DataTypes.STRING())
> >>   )
> >> ),
> >> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
> >>   ),
> >>
> >>   row(row("group123"), "2021-02-03 11:36:20"),
> >>   row(row("group123"), "2021-02-03 11:38:20"),
> >>   row(row("group123"), "2021-02-03 11:40:20")
> >> );
> >> tableEnv.createTemporaryView("post_events_kafka", testTable);
> >>
> >>
> >> I have also tried the following syntax, which allows to specify 
> >> watermark and row time, but I have not found how to create a ne

Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-30 Thread Timo Walther

Hi,

there are multiple ways to create a table for testing:

- use the datagen connector
- use the filesystem connector with CSV data
- and beginning from Flink 1.13 your code snippets becomes much simpler

Regards,
Timo

On 29.04.21 20:35, Svend wrote:

I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with 
a nested field and a watermark and filled with hard-coded values, which 
is all I need in order to test SQL expressions.


It's quite a mouthful though, is there a more succint to express the 
same thing?



var testData = List./of/(
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types./ROW_NAMED/(new String[] {"created", "event_time"},
Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
Types./SQL_TIMESTAMP
/)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
./forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
.withTimestampAssigner(
TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) 
(t2.getField(1))).getTime()))

);
var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), 
/$/("event_time").rowtime());

tableEnv.createTemporaryView("post_events_kafka", testDataTable);





On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
I'm trying to write java unit test for a Flink SQL application using 
Flink mini cluster, but I do not manage to create an input table with 
nested fields and time characteristics.


I had a look at the documentation and examples below, although I'm 
still struggling:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html 

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java 




Consider for example this simple expression that I want to test and 
which depends on the nested field "created.group_id" and expects 
"metricValue" to be the row time:



var createTableDDl = ""
    + " CREATE TEMPORARY VIEW 
postCreated10min  \n"
    + " 
AS  
\n"
    + " 
SELECT  
\n"
    + "   created.group_id as 
groupId,  \n"
    + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as 
metricTime,  \n"
    + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as 
rowTime, \n"
    + "   count(1) as 
metricValue   \n"
    + " FROM 
post_events_kafka  \n"
    + " GROUP 
BY    \n"
    + "   
created.group_id, 
\n"
    + "   TUMBLE(event_time, INTERVAL '10' 
MINUTES) \n";

tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input 
data with nested fields, but I have not found how to specify row time 
nor watermarks with this approach:



Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
    DataTypes.FIELD("created",
      DataTypes.ROW(
        DataTypes.FIELD("group_id", DataTypes.STRING())
  )
    ),
    DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),

  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify 
watermark and row time, but I have not found how to create a nested 
field with this approach:



var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))

  .withTimestampAssigner(
    TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am 

Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-29 Thread Svend
I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with a 
nested field and a watermark and filled with hard-coded values, which is all I 
need in order to test SQL expressions.

It's quite a mouthful though, is there a more succint to express the same thing?


var testData = List.*of*(
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types.*ROW_NAMED*(new String[] {"created", "event_time"},
Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*),
Types.*SQL_TIMESTAMP
*)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10))
.withTimestampAssigner(
TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp) 
(t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"), 
*$*("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);




On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> I'm trying to write java unit test for a Flink SQL application using Flink 
> mini cluster, but I do not manage to create an input table with nested fields 
> and time characteristics.
> 
> I had a look at the documentation and examples below, although I'm still 
> struggling:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> 
> 
> Consider for example this simple expression that I want to test and which 
> depends on the nested field "created.group_id" and expects "metricValue" to 
> be the row time:
> 
> 
> var createTableDDl = ""
> + " CREATE TEMPORARY VIEW postCreated10min
>   \n"
> + " AS
>   \n"
> + " SELECT
>   \n"
> + "   created.group_id as groupId,
>   \n"
> + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,
>   \n"
> + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,   
>   \n"
> + "   count(1) as metricValue 
>   \n"
> + " FROM post_events_kafka
>   \n"
> + " GROUP BY  
>   \n"
> + "   created.group_id,   
>   \n"
> + "   TUMBLE(event_time, INTERVAL '10' MINUTES)   
>   \n";
> tableEnv.executeSql(createTableDDl);
> 
> 
> In a unit test, the following syntax allows me to create test input data with 
> nested fields, but I have not found how to specify row time nor watermarks 
> with this approach:
> 
> 
> Table testTable = tableEnv.fromValues(
>   DataTypes.ROW(
> DataTypes.FIELD("created",
>   DataTypes.ROW(
> DataTypes.FIELD("group_id", DataTypes.STRING())
>   )
> ),
> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
>   ),
>  
>   row(row("group123"), "2021-02-03 11:36:20"),
>   row(row("group123"), "2021-02-03 11:38:20"),
>   row(row("group123"), "2021-02-03 11:40:20")
> );
> tableEnv.createTemporaryView("post_events_kafka", testTable);
> 
> 
> I have also tried the following syntax, which allows to specify watermark and 
> row time, but I have not found how to create a nested field with this 
> approach:
> 
> 
> var testData = List.of(
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
>   .fromCollection(testData)
>   .assignTimestampsAndWatermarks(WatermarkStrategy
>   .>forBoundedOutOfOrderness(Duration.ofMinutes(10))
>   .withTimestampAssigner(
> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
>   );
> var testDataTable = tableEnv.fromDataStream(
>   testStream,
>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
> );
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> 
> 
> 
> What am I missing?