Hello, We are using pyflink's datastream api v1.12.1 to consume from kafka and want to use one of the fields to act as the "rowtime" for windowing. We realize we need to convert BIGINT to TIMESTAMP before we use it as "rowtime".
py4j.protocol.Py4JJavaError: An error occurred while calling o91.select. : org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. But we are not sure where and how that needs to be implemented. Some help here would be really appreciated. Thanks, Shilpa import os from pyflink.table.expressions import lit, Expression from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.datastream import CheckpointingMode, ExternalizedCheckpointCleanup from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, TableConfig from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka from pyflink.table.window import Slide def main(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE) config = env.get_checkpoint_config() config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) st_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() ) register_kafka_source(st_env) register_transactions_sink_into_csv(st_env) #Filter st_env.from_path("source") \ .window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w")) \ .group_by("customer_id, w") \ .select("""customer_id as customer_id, count(*) as total_counts, w.start as start_time, w.end as end_time """) \ .insert_into("sink_into_csv") def register_kafka_source(st_env): # Add Source st_env.connect( Kafka() \ .version("universal") \ .topic("topic1") \ .property("group.id", "topic_consumer") \ .property("security.protocol", "SASL_PLAINTEXT") \ .property("sasl.mechanism", "PLAIN") \ .property("bootstrap.servers", "<bootsptrap_servers>") \ .property("sasl.jaas.config", "<user,password>") \ .start_from_earliest() ).with_format( Json() .fail_on_missing_field(False) .schema( DataTypes.ROW([ DataTypes.FIELD("customer_id", DataTypes.STRING()), DataTypes.FIELD("time_in_epoch_milliseconds", DataTypes.BIGINT()) ]) ) ).with_schema( Schema() .field("customer_id", DataTypes.STRING()) .field("rowtime", DataTypes.BIGINT()) .rowtime( Rowtime() .timestamps_from_field("time_in_epoch_milliseconds") .watermarks_periodic_bounded(10) ) ).in_append_mode( ).create_temporary_table( "source" ) def register_transactions_sink_into_csv(env): result_file = "/opt/examples/data/output/output_file.csv" if os.path.exists(result_file): os.remove(result_file) env.register_table_sink("sink_into_csv", CsvTableSink(["customer_id", "total_count", "start_time", "end_time"], [DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)], result_file)) if __name__ == "__main__": main()