Hi,
I have the following piece of code (for pyFlink v1.11) :

t_env.from_path(INPUT_TABLE) \
    .select("monitorId, data, rowtime") \
    .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
    .group_by("five_sec_window, monitorId") \
    .select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \
    .execute_insert(OUTPUT_TABLE)

Which is generating the exception :

Traceback (most recent call last):


* File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
<module>    .select("monitorId, data.avg, data.min, data.max,
five_sec_window.rowtime") \*  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.

*: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.*

The "rowtime" attribute in INPUT_TABLE is created as :

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                      )

...

     .field("rowtime", DataTypes.TIMESTAMP(3))
        .rowtime(
            Rowtime()
            .timestamps_from_field("time_st")
            .watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)


What is wrong with the code? I believe that I have already indicated which
attribute has to be treated as the time attribute.

Thank you,
Manas

Reply via email to