Hi. Thank you for the clarification.
I updated my code as below and got the desired result.
result = table.window(Slide.over(
row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
\
.group_by(col('w')) \
.select(call(read_raw_data, col('val')).sum)
However, I can only compute some aggregation over those bits and cannot
select the val column individually. If I remove the .sum like this:
.select(call(read_raw_data, col('val')))
I get this error:
py4j.protocol.Py4JJavaError: An error occurred while calling o90.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field
[val], input field list:[].
Do you know why?
On Tue, Nov 2, 2021 at 9:25 AM Caizhi Weng <[email protected]> wrote:
> Hi!
>
> You're not only grouping by the over window but also grouping by the
> value, thus only the records with the same value will be in the same group.
> I guess this is no intended.
>
> Long Nguyễn <[email protected]> 于2021年11月2日周二 上午3:05写道:
>
>> I have set up a program that takes bits 0 and 1 from a Kafka topic and
>> then uses Flink to create a sliding count window of size 5. In that window,
>> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output
>> 0.
>> Currently, I follow the way of calculating the sum of bits in the window.
>>
>> import os
>> from urllib.parse import quote
>>
>> from pyflink.common import Row
>> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
>> from pyflink.table.expressions import col, lit, row_interval
>> from pyflink.table.udf import udf
>> from pyflink.table.window import Slide
>>
>>
>> settings = EnvironmentSettings.new_instance(
>> ).in_streaming_mode().use_blink_planner().build()
>> table_env = TableEnvironment.create(settings)
>>
>> kafka_sql_connector_jar_path =
>> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)),
>>
>> 'flink-sql-connector-kafka_2.12-1.13.2.jar'))
>>
>> table_env.get_config() \
>> .get_configuration() \
>> .set_string("pipeline.jars",
>> "file://{}".format(kafka_sql_connector_jar_path))
>>
>> WINDOW_SIZE = 5
>> WINDOW_SLIDE = 1
>> THRESHOLD = 3
>>
>>
>> @udf(result_type=DataTypes.INT())
>> def read_raw_data(data):
>> return int(data, base=0)
>>
>>
>> def sliding_window_demo():
>> source_ddl = """
>> CREATE TABLE input(
>> val BINARY,
>> proctime AS PROCTIME()
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'flink-demo-input',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'flink-demo',
>> 'scan.startup.mode' = 'earliest-offset',
>> 'format' = 'raw'
>> )
>> """
>>
>> temp_ddl = """
>> CREATE TABLE temp(
>> res INT
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """
>>
>> table_env.execute_sql(source_ddl)
>> table_env.execute_sql(temp_ddl)
>>
>> table = table_env.from_path('input')
>>
>> result = table.window(Slide.over(
>>
>> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
>> \
>> .group_by(col('w'), col('val')) \
>> .select(read_raw_data(col('val')).sum)
>>
>> result.execute_insert('temp').wait()
>>
>>
>> However, when I call the sum expression on those bits in the window,
>> every time I add a 0 to the stream, the result is always 0, and when I add
>> a 1, it always returns 5 (which is the window size).
>> Can you tell me what I'm doing wrong? Thank you so much.
>>
>> --
>> ------------------------------------------------------------
>> --------------------------------------------------
>> Nguyen Dich Long,
>> School of Information and Communication Technology (SoICT -
>> https://www.soict.hust.edu.vn)
>> Hanoi University of Science and Technology (https://www.hust.edu.vn)
>> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
>> Noi, Vietnam
>> Tel: +84 (0)3.54.41.76.76
>> Email: [email protected]; [email protected]
>>
>
--
------------------------------------------------------------
--------------------------------------------------
Nguyen Dich Long,
School of Information and Communication Technology (SoICT -
https://www.soict.hust.edu.vn)
Hanoi University of Science and Technology (https://www.hust.edu.vn)
601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi,
Vietnam
Tel: +84 (0)3.54.41.76.76
Email: [email protected]; [email protected]