public static void main(String[] args) throws Exception {<br/>    
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();<br/>    
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);<br/>    
List&lt;Person&gt; list = new ArrayList&lt;&gt;();<br/>    list.add(new 
Person("Fred",35));<br/>    list.add(new Person("Wilma",35));<br/>    
list.add(new Person("Pebbles",2));<br/>    DataStream&lt;Person&gt; flintstones 
= env.fromCollection(list);<br/>    <br/>    // 为数据流定义事件时间属性<br/>    
DataStream&lt;Person&gt; flintstonesWithTime = 
flintstones.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor&lt;Person&gt;(Time.seconds(0)) {<br/>   
     @Override<br/>        public long extractTimestamp(Person element) {<br/>  
          // 这里假设您的数据流中的每个元素都包含一个时间戳字段,您可以根据实际情况进行修改<br/>            return 
System.currentTimeMillis(); // 也可以使用您的数据中的时间字段<br/>        }<br/>    });<br/>   
 <br/>    // 将DataStream转换为Table时定义窗口<br/>    Table table = 
tEnv.fromDataStream(flintstonesWithTime, $("name"), $("age"), 
$("eventTime").rowtime());<br/>    <br/>    Table select = 
table.window(Tumble.over(lit(10).seconds()).on($("eventTime")).as("w"))<br/>    
        .groupBy($("name"), $("w"))<br/>            .select($("name"), 
$("age").sum());<br/><br/>    tEnv.toAppendStream(select, 
Row.class).print();<br/><br/>    env.execute("Flink Window 
Example");<br/>}<br/><br/>public static class Person{<br/>    public String 
name;<br/>    public Integer age;<br/>    public Person(){}<br/>    public 
Person(String name,Integer age){<br/>        this.name = name;<br/>        
this.age = age;<br/>    }<br/>    public String toString(){<br/>        return 
this.name.toString()+":age "+this.age.toString();<br/>    }<br/>}
At 2024-03-08 09:28:10, "ha.fen...@aisino.com" <ha.fen...@aisino.com> wrote:
>public static void main(String[] args) {
>        StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>        List<Persion> list = new ArrayList<>();
>        list.add(new Persion("Fred",35));
>        list.add(new Persion("wilma",35));
>        list.add(new Persion("Pebbles",2));
>        DataStream<Persion> flintstones = env.fromCollection(list);
>        Table table = tEnv.fromDataStream(flintstones);
>        Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>        Table select1 = select.window(
>                Tumble.over(lit(10).second())
>                        .on($("addtime"))
>                        .as("w"))
>                .groupBy($("name"), $("w"))
>                .select($("name"), $("age").sum());
>        select1.execute().print();
>
>    }
>
>    public static class Persion{
>        public String name;
>        public Integer age;
>        public Persion(){}
>        public Persion(String name,Integer age){
>            this.name = name;
>            this.age = age;
>        }
>        public String toString(){
>            return this.name.toString()+":age "+this.age.toString();
>        }
>    }
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?

回复