Well I thought that in order to do the same with only the datastream api I
would need to use  MapPartitionFunction.



Op do 17 feb. 2022 om 10:41 schreef Francesco Guardiani <
france...@ververica.com>:

> Why do you need MapPartitionFunction?
>
> On Wed, Feb 16, 2022 at 7:02 PM HG <hanspeter.sl...@gmail.com> wrote:
>
>> Thanks
>>
>> Would the option for datastream be to write a MapPartitionFunction?
>>
>> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> > Which does not work since it cannot find lag function :-(
>>>
>>> lag and over are not supported at the moment with Table, so you need to
>>> use SQL for that.
>>>
>>> > *Will this obey the watermark strategy of the original Datastream?
>>> (see further below)*
>>>
>>> Yes. The code at the end of the mail is correct and should work fine. I
>>> have just one comment, if you're using this DataStream only to create the
>>> Table instance, you could also just define the watermark using the Schema
>>> builder itself, as described here:
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>>
>>> On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I need to calculate the difference in time between ordered rows per
>>>> transactionId. All events should arrive within the timeframe set by the
>>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>>> ignored.
>>>>
>>>> In SQL this would be :
>>>> select transactionId  , handlingTime , *handlingTime -
>>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>>> as elapsedTime*
>>>> from table
>>>>
>>>> When I code :
>>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>>>> *handlingTime
>>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>>
>>>> *Will this obey the watermark strategy of the original Datastream? (see
>>>> further below)*
>>>> I have also tried to use the Table Api with a session window like :
>>>> Table t = tupled3DsTable
>>>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>>> "w")).groupBy($("transactionId"), $("w"))
>>>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"),
>>>> $("handlingTime").max().over($("w")));
>>>> This gives:
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Could not resolve over call.
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>>
>>>> and also :
>>>> Table t = tupled3DsTable
>>>>         .window(Over.partitionby($("transactionId")).orderBy($(
>>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"
>>>> ));
>>>> Which does not work since it cannot find lag function :-(
>>>>
>>>> In java I have the following setup:
>>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>>> WatermarkStrategy
>>>>         .<Tuple3<Long, String,
>>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>>>         .withTimestampAssigner(new 
>>>> SerializableTimestampAssigner<Tuple3<Long,
>>>> String, String>>() {
>>>>             @Override
>>>>             public long extractTimestamp(Tuple3<Long, String, String>
>>>> element, long handlingTime) {
>>>>             return element.f0;
>>>>          }});
>>>>
>>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = 
>>>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>>
>>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>>>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>>>  "SOURCE_WATERMARK()")
>>>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to