??????????????kafka??????????????????????????????????????????????????????????????????????????????????
????????????????????????????????????????????????????????????????????


????????????????
//????????????kafka????
DataStreamSource<String&gt; dataStreamSource = 
KafkaConfigUtil.buildKafka(env).setParallelism(1);

//??????????????????????????
SingleOutputStreamOperator<OrderDetail&gt; orderDetails = 
dataStreamSource.flatMap(new OrderSplitService())
        .setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5));
//????
SingleOutputStreamOperator<OrderDetail&gt; simpleResults = 
orderDetails.flatMap(new OrderDetailFilterService())
        .setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5));

Table orderDetailTable = tableEnv.fromDataStream(simpleResults,        
$("orderNo"), 
$("memberId"),$("merchantId"),$("storeId"),$("internalId"),$("type"),
        
$("quantity"),$("unitPrice"),$("gmtPaidLong"),$("gmtPaid"),$("gmtPaidTimeStamp"),$("userActionTime").proctime());????????????????????????????????????????????kafka??????????????????????????????????????????????????????

回复