Re: Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-20 文章 Jingsong Li
这是bug,已经修复了,待发布

On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote:

> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
> 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
> parquet表依然没有任何问题,而orc表任务无限重启。并报错。
>
> java.io.FileNotFoundException: File does not exist:
> hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at 

Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-13 文章 Rui Li
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:

val tableResult = tEnv.executeSql(insert)
// wait to finish
tableResult.getJobClient.get
  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
  .get

> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?

这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下

> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?

可以在flink-conf.yaml里设置execution.checkpointing.interval


On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote:

> 添加不了附件,我就直接贴代码了
>
> import java.time.Duration
>
>
> import org.apache.flink.streaming.api.{CheckpointingMode,
> TimeCharacteristic}
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
> TableResult}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
>
>
>
> /**
>   * author dinghh
>   * time 2020-08-11 17:03
>   */
> object WriteHiveStreaming {
> def main(args: Array[String]): Unit = {
>
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
>
>
>
>
>
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
> catalogName,  // catalog name
> "default",// default database
>
> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
> // Hive config (hive-site.xml) directory
> "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
>
>
>
> //删除流表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
> """.stripMargin)
>
>
> //创建流表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `stream_db`.`datagen_user` (
>   | id INT,
>   | name STRING,
>   | dt AS localtimestamp,
>   | WATERMARK FOR dt AS dt
>   |) WITH (
>   | 'connector' = 'datagen',
>   | 'rows-per-second'='10',
>   | 'fields.id.kind'='random',
>   | 'fields.id.min'='1',
>   | 'fields.id.max'='1000',
>   | 'fields.name.length'='5'
>   |)
> """.stripMargin)
>
>
> //切换hive方言
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>
>
> //删除hive orc表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `default`.`hive_user_orc`
>   |
> """.stripMargin)
>
>
> //创建hive orc表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `default`.`hive_user_orc` (
>   |  id INT,
>   |  name STRING
>   |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING ) STORED AS ORC TBLPROPERTIES (
>   |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>   |  'sink.partition-commit.trigger'='partition-time',
>   |  'sink.partition-commit.delay'='1 min',
>   |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   |)
> """.stripMargin)
>
>
> //删除hive parquet表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
> """.stripMargin)
> //创建hive parquet表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `default`.`hive_user_parquet` (
>   |  id INT,
>   |  name STRING
>   |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING) STORED AS PARQUET TBLPROPERTIES (
>   |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>   |  'sink.partition-commit.trigger'='partition-time',
>   |  'sink.partition-commit.delay'='1 min',
>   |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   |)
> """.stripMargin)
>