Hi, Sumeet
Thanks you for the sharing. As Dian suggested, I think you could use b as
your `group_by`'s key and so the b could be output directly.
I think it is more simple.
Best,
Guowei


On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Sumeet,
>
> Thanks for the sharing.
>
> Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since
> the value for input.a is always the same, it’s equal to group_by(col(‘w')
> , input.b) logically. The benefit is that you could access input.a
> directly in the select clause.
>
> Regards,
> Dian
>
> 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
>
> Hi Guowei,
>
> Let me elaborate the use case with an example.
>
> Sample input table looks like this:
>
> time    a   b   c
> -----------------
> t0      a0  b0  1
> t1      a0  b1  2
> t2      a0  b2  3
> t3      a0  b0  6
> t4      a0  b1  7
> t5      a0  b2  8
>
> Basically, every time interval there are new readings from a fixed set of
> sensors (b0, b1 and b2). All these rows have a few constant fields
> representing metadata about the input (a0).
>
> Desired output for every time interval is the average reading for every
> sensor (b0, b1, b2), along with the constant metadata (a0):
>
> a0    b0    avg(c)
> a0    b1    avg(c)
> a0    b2    avg(c)
>
> This is what I was trying to build using a simple Tumble window:
>
> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
> \
>     .group_by(col('w'), input.b) \
>     .select(
>         input.a,                            <=== constant metadata field,
> same for every input record
>         input.b,                            <=== group_by field, to
> compute averages
>         input.c.avg.alias('avg_value')) \
>     .execute_insert('MySink') \
>     .wait()
>
> The example above is highly simplified, but I hope it explains what I'm
> trying to achieve.
>
> Thanks,
> Sumeet
>
>
> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Sumeet,
>>
>> 1) Regarding to the above exception, it’s a known issue and has been
>> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> 
>> [1]. It
>> will be available in the coming 1.12.3. You could also cherry-pick that fix
>> to 1.12.2 and build from source following the instruction described in [2]
>> if needed.
>>
>> 2) Regarding to your requirements, could you describe what you want to do
>> with group window or over window?
>> For group window(e.g. tumble window, hop window, session window, etc), it
>> will output one row for multiple inputs belonging to the same window. You
>> could not just passing through it from input to sink as it is
>> non-determinitic which row to use as there are multiple input rows. That’s
>> the reason why you have to declare a field in the group by clause if you
>> want to access it directly in the select clause. For over window, it will
>> output one row for each input and so you could pass through it directly.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21922.
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>>
>>
>> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
>>
>> Thanks Guowei. I'm trying out Over Windows, as follows:
>>
>> input \
>>     .over_window(
>>         Over.partition_by(col(input.a)) \
>>         .order_by(input.Timestamp) \
>>         .preceding(lit(10).seconds) \
>>         .alias('w')) \
>>     .select(
>>         input.b,
>>         input.c.avg.over(col('w'))) \
>>     .execute_insert('MySink') \
>>     .wait()
>>
>> But running into following exception:
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>>
>> Is there any extra Jar that needs to be included for Over Windows. From
>> the code it doesn't appear so.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Sumeet
>>>
>>> Maybe you could try the Over Windows[1], which could keep the
>>> "non-group-key" column.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <
>>> sumeet.malho...@gmail.com> wrote:
>>>
>>>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause
>>>> any issues. It's only when I want to use "input.b".
>>>>
>>>> My use case is to basically emit "input.b" in the final sink as is, and
>>>> not really perform any aggregation on that column - more like pass through
>>>> from input to sink. What's the best way to achieve this? I was thinking
>>>> that making it part of the select() clause would do it, but as you said
>>>> there needs to be some aggregation performed on it.
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>>
>>>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, Sumeet
>>>>>       For "input.b" I think you should aggregate the non-group-key
>>>>> column[1].
>>>>> But I am not sure why the "input.c.avg.alias('avg_value')"  has
>>>>> resolved errors. Would you mind giving more detailed error information?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <
>>>>> sumeet.malho...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a use case where I'm creating a Tumbling window as follows:
>>>>>>
>>>>>> "input" table has columns [Timestamp, a, b, c]
>>>>>>
>>>>>> input \
>>>>>>
>>>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>>>>>>     .group_by(col('w'), input.a) \
>>>>>>     .select(
>>>>>>         col('w').start.alias('window_start'),
>>>>>>         col('w').end.alias('window_end'),
>>>>>>         input.b,
>>>>>>         input.c.avg.alias('avg_value')) \
>>>>>>     .execute_insert('MySink') \
>>>>>>     .wait()
>>>>>>
>>>>>> This throws an exception that it cannot resolve the fields "b" and
>>>>>> "c" inside the select statement. If I mention these column names inside 
>>>>>> the
>>>>>> group_by() statement as follows:
>>>>>>
>>>>>> .group_by(col('w'), input.a, input.b, input.c)
>>>>>>
>>>>>> then the column names in the subsequent select statement can be
>>>>>> resolved.
>>>>>>
>>>>>> Basically, unless the column name is explicitly made part of the
>>>>>> group_by() clause, the subsequent select() clause doesn't resolve it. 
>>>>>> This
>>>>>> is very similar to the example from Flink's documentation here [1]:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>>>>>> where a similar procedure works.
>>>>>>
>>>>>> Any idea how I can access columns from the input stream, without
>>>>>> having to mention them in the group_by() clause? I really don't want to
>>>>>> group the results by those fields, but they need to be written to the 
>>>>>> sink
>>>>>> eventually.
>>>>>>
>>>>>> Thanks,
>>>>>> Sumeet
>>>>>>
>>>>>
>>
>

Reply via email to