Re: Re: Re:Re: 用hive streaming写 orc文件的问题
这是bug,已经修复了,待发布 On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote: > 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间 > 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务, > parquet表依然没有任何问题,而orc表任务无限重启。并报错。 > > java.io.FileNotFoundException: File does not exist: > hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298 > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at StreamExecCalc$21.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at
Re: Re:Re: 用hive streaming写 orc文件的问题
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束: val tableResult = tEnv.executeSql(insert) // wait to finish tableResult.getJobClient.get .getJobExecutionResult(Thread.currentThread.getContextClassLoader) .get > 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 可以在flink-conf.yaml里设置execution.checkpointing.interval On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote: > 添加不了附件,我就直接贴代码了 > > import java.time.Duration > > > import org.apache.flink.streaming.api.{CheckpointingMode, > TimeCharacteristic} > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, > TableResult} > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.table.catalog.hive.HiveCatalog > > > > > /** > * author dinghh > * time 2020-08-11 17:03 > */ > object WriteHiveStreaming { > def main(args: Array[String]): Unit = { > > > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > val tableEnv = StreamTableEnvironment.create(streamEnv, > tableEnvSettings) > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > > > > > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "default",// default database > > "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", > // Hive config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > > > > //删除流表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `stream_db`.`datagen_user` > """.stripMargin) > > > //创建流表 > tableEnv.executeSql( > """ > |CREATE TABLE `stream_db`.`datagen_user` ( > | id INT, > | name STRING, > | dt AS localtimestamp, > | WATERMARK FOR dt AS dt > |) WITH ( > | 'connector' = 'datagen', > | 'rows-per-second'='10', > | 'fields.id.kind'='random', > | 'fields.id.min'='1', > | 'fields.id.max'='1000', > | 'fields.name.length'='5' > |) > """.stripMargin) > > > //切换hive方言 > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > > > //删除hive orc表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `default`.`hive_user_orc` > | > """.stripMargin) > > > //创建hive orc表 > tableEnv.executeSql( > """ > |CREATE TABLE `default`.`hive_user_orc` ( > | id INT, > | name STRING > |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > STRING ) STORED AS ORC TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$ts_dt > $ts_hour:$ts_minute:00.000', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 min', > | > 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > """.stripMargin) > > > //删除hive parquet表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `default`.`hive_user_parquet` > """.stripMargin) > //创建hive parquet表 > tableEnv.executeSql( > """ > |CREATE TABLE `default`.`hive_user_parquet` ( > | id INT, > | name STRING > |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > STRING) STORED AS PARQUET TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$ts_dt > $ts_hour:$ts_minute:00.000', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 min', > | > 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > """.stripMargin) >