Hi, Sumeet Thanks you for the sharing. As Dian suggested, I think you could use b as your `group_by`'s key and so the b could be output directly. I think it is more simple. Best, Guowei
On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Sumeet, > > Thanks for the sharing. > > Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since > the value for input.a is always the same, it’s equal to group_by(col(‘w') > , input.b) logically. The benefit is that you could access input.a > directly in the select clause. > > Regards, > Dian > > 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: > > Hi Guowei, > > Let me elaborate the use case with an example. > > Sample input table looks like this: > > time a b c > ----------------- > t0 a0 b0 1 > t1 a0 b1 2 > t2 a0 b2 3 > t3 a0 b0 6 > t4 a0 b1 7 > t5 a0 b2 8 > > Basically, every time interval there are new readings from a fixed set of > sensors (b0, b1 and b2). All these rows have a few constant fields > representing metadata about the input (a0). > > Desired output for every time interval is the average reading for every > sensor (b0, b1, b2), along with the constant metadata (a0): > > a0 b0 avg(c) > a0 b1 avg(c) > a0 b2 avg(c) > > This is what I was trying to build using a simple Tumble window: > > input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) > \ > .group_by(col('w'), input.b) \ > .select( > input.a, <=== constant metadata field, > same for every input record > input.b, <=== group_by field, to > compute averages > input.c.avg.alias('avg_value')) \ > .execute_insert('MySink') \ > .wait() > > The example above is highly simplified, but I hope it explains what I'm > trying to achieve. > > Thanks, > Sumeet > > > On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Sumeet, >> >> 1) Regarding to the above exception, it’s a known issue and has been >> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> >> [1]. It >> will be available in the coming 1.12.3. You could also cherry-pick that fix >> to 1.12.2 and build from source following the instruction described in [2] >> if needed. >> >> 2) Regarding to your requirements, could you describe what you want to do >> with group window or over window? >> For group window(e.g. tumble window, hop window, session window, etc), it >> will output one row for multiple inputs belonging to the same window. You >> could not just passing through it from input to sink as it is >> non-determinitic which row to use as there are multiple input rows. That’s >> the reason why you have to declare a field in the group by clause if you >> want to access it directly in the select clause. For over window, it will >> output one row for each input and so you could pass through it directly. >> >> [1] https://issues.apache.org/jira/browse/FLINK-21922. >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink >> >> >> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: >> >> Thanks Guowei. I'm trying out Over Windows, as follows: >> >> input \ >> .over_window( >> Over.partition_by(col(input.a)) \ >> .order_by(input.Timestamp) \ >> .preceding(lit(10).seconds) \ >> .alias('w')) \ >> .select( >> input.b, >> input.c.avg.over(col('w'))) \ >> .execute_insert('MySink') \ >> .wait() >> >> But running into following exception: >> >> py4j.protocol.Py4JError: An error occurred while calling >> z:org.apache.flink.table.api.Over.partitionBy. Trace: >> org.apache.flink.api.python.shaded.py4j.Py4JException: Method >> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist >> >> Is there any extra Jar that needs to be included for Over Windows. From >> the code it doesn't appear so. >> >> Thanks, >> Sumeet >> >> >> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Sumeet >>> >>> Maybe you could try the Over Windows[1], which could keep the >>> "non-group-key" column. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows >>> >>> Best, >>> Guowei >>> >>> >>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra < >>> sumeet.malho...@gmail.com> wrote: >>> >>>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause >>>> any issues. It's only when I want to use "input.b". >>>> >>>> My use case is to basically emit "input.b" in the final sink as is, and >>>> not really perform any aggregation on that column - more like pass through >>>> from input to sink. What's the best way to achieve this? I was thinking >>>> that making it part of the select() clause would do it, but as you said >>>> there needs to be some aggregation performed on it. >>>> >>>> Thanks, >>>> Sumeet >>>> >>>> >>>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com> >>>> wrote: >>>> >>>>> Hi, Sumeet >>>>> For "input.b" I think you should aggregate the non-group-key >>>>> column[1]. >>>>> But I am not sure why the "input.c.avg.alias('avg_value')" has >>>>> resolved errors. Would you mind giving more detailed error information? >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows >>>>> >>>>> Best, >>>>> Guowei >>>>> >>>>> >>>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra < >>>>> sumeet.malho...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have a use case where I'm creating a Tumbling window as follows: >>>>>> >>>>>> "input" table has columns [Timestamp, a, b, c] >>>>>> >>>>>> input \ >>>>>> >>>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ >>>>>> .group_by(col('w'), input.a) \ >>>>>> .select( >>>>>> col('w').start.alias('window_start'), >>>>>> col('w').end.alias('window_end'), >>>>>> input.b, >>>>>> input.c.avg.alias('avg_value')) \ >>>>>> .execute_insert('MySink') \ >>>>>> .wait() >>>>>> >>>>>> This throws an exception that it cannot resolve the fields "b" and >>>>>> "c" inside the select statement. If I mention these column names inside >>>>>> the >>>>>> group_by() statement as follows: >>>>>> >>>>>> .group_by(col('w'), input.a, input.b, input.c) >>>>>> >>>>>> then the column names in the subsequent select statement can be >>>>>> resolved. >>>>>> >>>>>> Basically, unless the column name is explicitly made part of the >>>>>> group_by() clause, the subsequent select() clause doesn't resolve it. >>>>>> This >>>>>> is very similar to the example from Flink's documentation here [1]: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, >>>>>> where a similar procedure works. >>>>>> >>>>>> Any idea how I can access columns from the input stream, without >>>>>> having to mention them in the group_by() clause? I really don't want to >>>>>> group the results by those fields, but they need to be written to the >>>>>> sink >>>>>> eventually. >>>>>> >>>>>> Thanks, >>>>>> Sumeet >>>>>> >>>>> >> >