phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: =========================================================== def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s.......,%s>", "nt:string", "event_time:string", "event_id:string", ......, "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[[Row]] ("yyyyMMdd/HHmm")) .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.....,item.getAppid,item.getAppname) row } ....... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath))
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services