??????????????kafka?????????????????????????????????????????????????????????????????????????????????? ????????????????????????????????????????????????????????????????????
???????????????? //????????????kafka???? DataStreamSource<String> dataStreamSource = KafkaConfigUtil.buildKafka(env).setParallelism(1); //?????????????????????????? SingleOutputStreamOperator<OrderDetail> orderDetails = dataStreamSource.flatMap(new OrderSplitService()) .setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5)); //???? SingleOutputStreamOperator<OrderDetail> simpleResults = orderDetails.flatMap(new OrderDetailFilterService()) .setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5)); Table orderDetailTable = tableEnv.fromDataStream(simpleResults, $("orderNo"), $("memberId"),$("merchantId"),$("storeId"),$("internalId"),$("type"), $("quantity"),$("unitPrice"),$("gmtPaidLong"),$("gmtPaid"),$("gmtPaidTimeStamp"),$("userActionTime").proctime());????????????????????????????????????????????kafka??????????????????????????????????????????????????????