?????? ?????????????? StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);
DataStream<Row> rowDataStreamSource = bsEnv.addSource(new SourceFunction<Row>() { @Override public void run(SourceContext<Row> sourceContext) throws Exception { while (true) { Thread.sleep(1000); Row row = new Row(RowKind.INSERT, 2); row.setField(0, "a"); row.setField(1, new Timestamp(System.currentTimeMillis())); sourceContext.collect(row); } } @Override public void cancel() { } }) .returns(new RowTypeInfo(new TypeInformation[]{Types.STRING, Types.SQL_TIMESTAMP})) .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); Table table = bsTableEnv.fromDataStream(rowDataStreamSource, $("a"), $("b").rowtime()); table.execute().print();??????????$("b").rowtime()????????????????rowtime????????table????????????????????????????????| +I | a | +1705471-09-26T16:47... | | +I | a | +1705471-09-26T16:47... | | +I | a | +1705471-09-26T16:47... |????????????????????????????????????????????rowtime????????????long??timestamp????????LocalDateTime??????????????????????????????LocalDateTime??????