[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584665#comment-16584665
 ] 

ASF GitHub Bot commented on FLINK-9407:
---------------------------------------

phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] 
Support orc rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357
 
 
   @zhangminglei 
   the OrcFileWriter with BucketingSink can rolling  when the batchSize is full?
   It seems that one record one file.
   the code:
   ===========================================================
   def orcSchemaMetaInfo = String.format(
       "struct<%s,%s,%s.......,%s>",
       "nt:string", "event_time:string", "event_id:string", ......, 
"appname:string")
   
   def getRowSink(distPath : String) = {
       val sink = new BucketingSink[Row](distPath + "/with-bucket/")
       sink.setBatchSize(1024 * 1024 * 1024)
         .setBucketer(new DateTimeBucketer[Row]("yyyyMMdd/HHmm"))
         .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo))
         .setPartPrefix("sdk-etl")
       sink
     }
   
   def getOrcRow(item : sdkItem) : Row={
       val row = Row.of(
         item.getNt, item.getEvent_time, 
item.getEvent_id,.....,item.getAppid,item.getAppname)
       row
   }
   
   
   .......
   val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new 
SimpleStringSchema, params.getProperties)
       val messageStream = env.addSource(kafkaConsumer)
         .flatMap(in => SDKParse.parseSDK(in, inputTopic))
         .filter(item => item != None)
         .flatMap(item => Some(item).get)
         .map(item => getOrcRow(item))
       messageStream.addSink(getRowSink(distPath))

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support orc rolling sink writer
> -------------------------------
>
>                 Key: FLINK-9407
>                 URL: https://issues.apache.org/jira/browse/FLINK-9407
>             Project: Flink
>          Issue Type: New Feature
>          Components: filesystem-connector
>            Reporter: zhangminglei
>            Assignee: zhangminglei
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-----+---+-------+
> | name|age|married|
> +-----+---+-------+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-----+---+-------+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to