[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094432#comment-17094432
 ] 

Jingsong Lee commented on FLINK-17438:
--

Try implement a Extractor:

 
{code:java}
@Override
public byte[] extract(T element) {
   return (element.toString() + 
System.lineSeparator()).getBytes(StandardCharsets.UTF_8 /**You hive charset*/);
}
{code}
 

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Sivaprasanna Sethuraman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094434#comment-17094434
 ] 

Sivaprasanna Sethuraman commented on FLINK-17438:
-

I think this is an encoding issue that came because of using DefaultExtractor 
of String type.

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095027#comment-17095027
 ] 

颖 commented on FLINK-17438:
---

[~lzljs3620320] yes,I used this to specify the encoding utf-8, but it didn't 
work.

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095065#comment-17095065
 ] 

颖 commented on FLINK-17438:
---

[~zenfenan] I wonder if you have any solution?

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095066#comment-17095066
 ] 

Jingsong Lee commented on FLINK-17438:
--

[~why198852] Have you checked your hive charset? Is that UTT-8 or some others?

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-28 Thread Sivaprasanna Sethuraman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095076#comment-17095076
 ] 

Sivaprasanna Sethuraman commented on FLINK-17438:
-

[~why198852] I agree with [~lzljs3620320]. Can you check the charset your Hive 
is configured with? Java Strings completely support unicode so you can use 
Chinese characters with them. You just have to use the encoding as UTF-8.

Can you try setting {{SERDEPROPERTIES('serialization.encoding'='utf-8') }}in 
your Hive table DDL?

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095218#comment-17095218
 ] 

颖 commented on FLINK-17438:
---

[~lzljs3620320] [~zenfenan]Again, no effect 

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-04-30 Thread Sivaprasanna Sethuraman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097163#comment-17097163
 ] 

Sivaprasanna Sethuraman commented on FLINK-17438:
-

If the StringExtractor in StreamingFileSink and Hive serdeproperties didn't 
help, I think you may have to look into the SerializationSchema implementation 
used in the FlinkKafkaConsumer. Can you share how your FlinkKafkaConsumer 
instantiation looks?

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-05-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100492#comment-17100492
 ] 

颖 commented on FLINK-17438:
---

native kafka 0.11 consumption,code : 

val topics = sourceKafkaTopic.split(",").toList
val consumer = new FlinkKafkaConsumer011[String](topics, new 
SimpleStringSchema(), sourceProperties)
consumer.setStartFromGroupOffsets()

val ds = env.addSource(consumer)(TypeInformation.of(classOf[String]))

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17438) Flink StreamingFileSink chinese garbled

2020-05-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100602#comment-17100602
 ] 

颖 commented on FLINK-17438:
---

[~zenfenan] [~lzljs3620320] Problem has been solved, mainly is the 
flink-shaded-hadoop-2 package version dependency problem, at the same time, you 
need to specify the DefaultExtractor coding format, thank you for the guidance 
of the two.

> Flink StreamingFileSink chinese garbled
> ---
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>Reporter: 颖
>Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new 
> CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = 
> OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), 
> writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = 
> DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1
>  * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), 
> new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to 
> hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese 
> fields in the log, and the corresponding scrambled code is after hive is 
> resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区mmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"��","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)