phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===========================================================
   def orcSchemaMetaInfo = String.format(
       "struct<%s,%s,%s.......,%s>",
       "nt:string", "event_time:string", "event_id:string", ......, 
"appname:string")
   
   def getRowSink(distPath : String) = {
       val sink = new BucketingSink[Row](distPath + "/with-bucket/")
       sink.setBatchSize(1024 * 1024 * 1024)
         .setBucketer(new DateTimeBucketer[[Row]] ("yyyyMMdd/HHmm"))
         .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo))
         .setPartPrefix("sdk-etl")
       sink
     }
   
   def getOrcRow(item : sdkItem) : Row={
       val row = Row.of(
         item.getNt, item.getEvent_time, 
item.getEvent_id,.....,item.getAppid,item.getAppname)
       row
   }
   
   
   .......
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
       val messageStream = env.addSource(kafkaConsumer)
         .flatMap(in => SDKParse.parseSDK(in, inputTopic))
         .filter(item => item != None)
         .flatMap(item => Some(item).get)
         .map(item => getOrcRow(item))
       messageStream.addSink(getRowSink(distPath))

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to