public static void main(String[] args) throws Exception {<br/>
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();<br/>
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);<br/>
List<Person> list = new ArrayList<>();<br/> list.add(new
Person("Fred",35));<br/> list.add(new Person("Wilma",35));<br/>
list.add(new Person("Pebbles",2));<br/> DataStream<Person> flintstones
= env.fromCollection(list);<br/> <br/> // 为数据流定义事件时间属性<br/>
DataStream<Person> flintstonesWithTime =
flintstones.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Person>(Time.seconds(0)) {<br/>
@Override<br/> public long extractTimestamp(Person element) {<br/>
// 这里假设您的数据流中的每个元素都包含一个时间戳字段,您可以根据实际情况进行修改<br/> return
System.currentTimeMillis(); // 也可以使用您的数据中的时间字段<br/> }<br/> });<br/>
<br/> // 将DataStream转换为Table时定义窗口<br/> Table table =
tEnv.fromDataStream(flintstonesWithTime, $("name"), $("age"),
$("eventTime").rowtime());<br/> <br/> Table select =
table.window(Tumble.over(lit(10).seconds()).on($("eventTime")).as("w"))<br/>
.groupBy($("name"), $("w"))<br/> .select($("name"),
$("age").sum());<br/><br/> tEnv.toAppendStream(select,
Row.class).print();<br/><br/> env.execute("Flink Window
Example");<br/>}<br/><br/>public static class Person{<br/> public String
name;<br/> public Integer age;<br/> public Person(){}<br/> public
Person(String name,Integer age){<br/> this.name = name;<br/>
this.age = age;<br/> }<br/> public String toString(){<br/> return
this.name.toString()+":age "+this.age.toString();<br/> }<br/>}
At 2024-03-08 09:28:10, "ha.fen...@aisino.com" <ha.fen...@aisino.com> wrote:
>public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> List<Persion> list = new ArrayList<>();
> list.add(new Persion("Fred",35));
> list.add(new Persion("wilma",35));
> list.add(new Persion("Pebbles",2));
> DataStream<Persion> flintstones = env.fromCollection(list);
> Table table = tEnv.fromDataStream(flintstones);
> Table select = table.select($("name"), $("age"),
> $("addtime").proctime());
> Table select1 = select.window(
> Tumble.over(lit(10).second())
> .on($("addtime"))
> .as("w"))
> .groupBy($("name"), $("w"))
> .select($("name"), $("age").sum());
> select1.execute().print();
>
> }
>
> public static class Persion{
> public String name;
> public Integer age;
> public Persion(){}
> public Persion(String name,Integer age){
> this.name = name;
> this.age = age;
> }
> public String toString(){
> return this.name.toString()+":age "+this.age.toString();
> }
> }
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException:
>Window properties can only be used on windowed tables
>是哪里错了?