[ https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094434#comment-17094434 ]
Sivaprasanna Sethuraman commented on FLINK-17438: ------------------------------------------------- I think this is an encoding issue that came because of using DefaultExtractor of String type. > Flink StreamingFileSink chinese garbled > --------------------------------------- > > Key: FLINK-17438 > URL: https://issues.apache.org/jira/browse/FLINK-17438 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.10.0 > Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 > Reporter: 颖 > Priority: Blocker > > val writer:CompressWriterFactory[String] = new > CompressWriterFactory[String](new DefaultExtractor[String]()) > .withHadoopCompression(s"SnappyCodec")//${compress} > val fileConfig = > OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build() > val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), > writer) > // 自定义分桶策略 > bulkFormatBuilder.withBucketAssigner(new DemoAssigner()) > // 自定义输出文件配置 > bulkFormatBuilder.withOutputFileConfig(fileConfig) > val sink = bulkFormatBuilder.build() > // val rollingPolicy = > DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1 > * 1024 * 1024) > // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), > new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build()) > // val sink = bulkFormatBuilder.build() > ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to > hdfs") > > In this way, flink API is called and written to HDFS. There are Chinese > fields in the log, and the corresponding scrambled code is after hive is > resolved, > CREATE EXTERNAL TABLE `demo_app`( > `str` string COMMENT '原始记录json') > COMMENT 'app flink埋点日志' > PARTITIONED BY ( > `ymd` string COMMENT '日期分区yyyymmdd') > ROW FORMAT SERDE > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.mapred.TextInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' > LOCATION > 'hdfs://nameservice1/user/xxx/inke_back.db' > kafka source data : > {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"} > > hive data : > {"name":"inke.dcc.flume.collect","type":"flume","status":"������","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"} > -- This message was sent by Atlassian Jira (v8.3.4#803005)