????????????????: ??????fromElements?????????????????????????????????????????????????? ?????????????????????????????????????????????? ?????????? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.setRuntimeMode(RuntimeExecutionMode.BATCH); //????????????????
DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("aa", 1), Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb", 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4)); Table table = tenv.fromDataStream(source, Schema.newBuilder() .column("f0", "STRING") .column("f1", "INTEGER") .build()); tenv.createTemporaryView("test",table); //????????sql???? tenv.createTemporarySystemFunction("Average", avg5.Average.class); tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by f0").print(); ?????? +----+--------------------------------+--------------------------------+ | op | f0 | rbm | +----+--------------------------------+--------------------------------+ | +I | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | +I | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | +----+--------------------------------+--------------------------------+ ?????????????????????????????????????????????????????? ??????????????batch?????????????????????????????????????????????????????????????????? ???????????????????????????????????????????????????????????????????????????????????????????????? ??????