??????
      ??????????????
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);

DataStream<Row&gt; rowDataStreamSource = bsEnv.addSource(new 
SourceFunction<Row&gt;() {
    @Override
    public void run(SourceContext<Row&gt; sourceContext) throws Exception {
        while (true) {
            Thread.sleep(1000);
            Row row = new Row(RowKind.INSERT, 2);
            row.setField(0, "a");
            row.setField(1, new Timestamp(System.currentTimeMillis()));
            sourceContext.collect(row);
        }
    }

    @Override
    public void cancel() {

    }
})
        .returns(new RowTypeInfo(new TypeInformation[]{Types.STRING, 
Types.SQL_TIMESTAMP}))
        
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

Table table = bsTableEnv.fromDataStream(rowDataStreamSource, $("a"), 
$("b").rowtime());
table.execute().print();??????????$("b").rowtime()????????????????rowtime????????table????????????????????????????????|
 +I |                              a | +1705471-09-26T16:47... | | +I |         
                     a | +1705471-09-26T16:47... | | +I |                       
       a | +1705471-09-26T16:47... 
|????????????????????????????????????????????rowtime????????????long??timestamp????????LocalDateTime??????????????????????????????LocalDateTime??????

回复