Well I thought that in order to do the same with only the datastream api I would need to use MapPartitionFunction.
Op do 17 feb. 2022 om 10:41 schreef Francesco Guardiani < france...@ververica.com>: > Why do you need MapPartitionFunction? > > On Wed, Feb 16, 2022 at 7:02 PM HG <hanspeter.sl...@gmail.com> wrote: > >> Thanks >> >> Would the option for datastream be to write a MapPartitionFunction? >> >> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani < >> france...@ververica.com>: >> >>> > Which does not work since it cannot find lag function :-( >>> >>> lag and over are not supported at the moment with Table, so you need to >>> use SQL for that. >>> >>> > *Will this obey the watermark strategy of the original Datastream? >>> (see further below)* >>> >>> Yes. The code at the end of the mail is correct and should work fine. I >>> have just one comment, if you're using this DataStream only to create the >>> Table instance, you could also just define the watermark using the Schema >>> builder itself, as described here: >>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression- >>> >>> On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote: >>> >>>> Hello all >>>> >>>> I need to calculate the difference in time between ordered rows per >>>> transactionId. All events should arrive within the timeframe set by the >>>> out-of-orderness ( a couple of minutes). Events outside this time should be >>>> ignored. >>>> >>>> In SQL this would be : >>>> select transactionId , handlingTime , *handlingTime - >>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime) >>>> as elapsedTime* >>>> from table >>>> >>>> When I code : >>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, >>>> *handlingTime >>>> - if(null(lag(handlingTime) over (partition by transactionId order by >>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw") >>>> >>>> *Will this obey the watermark strategy of the original Datastream? (see >>>> further below)* >>>> I have also tried to use the Table Api with a session window like : >>>> Table t = tupled3DsTable >>>> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as( >>>> "w")).groupBy($("transactionId"), $("w")) >>>> .select($("handlingTime"), $("transactionId"), $("originalEvent"), >>>> $("handlingTime").max().over($("w"))); >>>> This gives: >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error: Could not resolve over call. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) >>>> >>>> and also : >>>> Table t = tupled3DsTable >>>> .window(Over.partitionby($("transactionId")).orderBy($( >>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), >>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime" >>>> )); >>>> Which does not work since it cannot find lag function :-( >>>> >>>> In java I have the following setup: >>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy = >>>> WatermarkStrategy >>>> .<Tuple3<Long, String, >>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) >>>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) >>>> .withTimestampAssigner(new >>>> SerializableTimestampAssigner<Tuple3<Long, >>>> String, String>>() { >>>> @Override >>>> public long extractTimestamp(Tuple3<Long, String, String> >>>> element, long handlingTime) { >>>> return element.f0; >>>> }}); >>>> >>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = >>>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy); >>>> >>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, >>>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", >>>> "SOURCE_WATERMARK()") >>>> .build()).as("handlingTime", "transactionId", "originalEvent"); >>>> >>>> >>>> >>>> >>>> >>>>