hi,Zhou Zach : 问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗?
Zhou Zach <wander...@163.com> 于2020年7月13日周一 下午8:17写道: > 好的,感谢答疑 > > > > > > > > > > > > > > > > > > 在 2020-07-13 19:49:10,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >创建kafka_table需要在default dialect下。 > > > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <wander...@163.com> wrote: > > > >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, > >> 如果是default Dialect创建的表,是不是只是在临时会话有效 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 19:27:44,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >> >Hi, > >> > > >> >问题一: > >> > > >> >只要current catalog是HiveCatalog。 > >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. > >> > > >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? > >> > > >> >问题二: > >> > > >> >用filesystem创建出来的是filesystem的表,它和hive > >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 > >> > > >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 > >> >但是它的partition commit是不支持metastore的,所以不会有自动add > >> >partition到hive的默认实现,你需要自定义partition-commit-policy. > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <wander...@163.com> wrote: > >> > > >> >> 尴尬。。。。 > >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 > >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! > >> >> 还有两个问题问下, > >> >> 问题1: > >> >> 创建的kafka_table,在hive和Flink > >> >> > >> > SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 问题2: > >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector > 也是可以创建hive表,我尝试了一下,报错了: > >> >> java.util.concurrent.CompletionException: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> >> [?:1.8.0_161] > >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> >> [?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> >> [qile-data-flow-1.0.jar:?] > >> >> Caused by: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> ... 11 more > >> >> Caused by: > org.apache.flink.client.program.ProgramInvocationException: > >> The > >> >> main method caused an error: Unable to create a sink for writing > table > >> >> 'default_catalog.default_database.hive_table1'. > >> >> > >> >> Table options are: > >> >> > >> >> 'connector'='filesystem' > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='0s' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to > >> >> create a sink for writing table > >> >> 'default_catalog.default_database.hive_table1'. > >> >> > >> >> Table options are: > >> >> > >> >> 'connector'='filesystem' > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='0s' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot > >> discover > >> >> a connector using option ''connector'='filesystem''. > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Could not > >> find > >> >> any factory for identifier 'filesystem' that implements > >> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the > >> classpath. > >> >> > >> >> Available factory identifiers are: > >> >> > >> >> blackhole > >> >> hbase-1.4 > >> >> jdbc > >> >> kafka > >> >> print > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> query: > >> >> > >> >> > >> >> > >> >> > >> >> val streamExecutionEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> > >> >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> >> CheckpointingMode.EXACTLY_ONCE) > >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > >> 1000) > >> >> > >> >> val blinkEnvSettings = > >> >> > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> >> val streamTableEnv = > >> StreamTableEnvironment.create(streamExecutionEnv, > >> >> blinkEnvSettings) > >> >> > >> >> > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> | > >> >> |CREATE TABLE hive_table ( > >> >> | user_id STRING, > >> >> | age INT > >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> TBLPROPERTIES ( > >> >> | 'connector'='filesystem', > >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > $hr:00:00', > >> >> | 'sink.partition-commit.delay'='0s', > >> >> | > 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> |) > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |CREATE TABLE kafka_table ( > >> >> | uid VARCHAR, > >> >> | -- uid BIGINT, > >> >> | sex VARCHAR, > >> >> | age INT, > >> >> | created_time TIMESTAMP(3), > >> >> | WATERMARK FOR created_time as created_time - INTERVAL > '3' > >> >> SECOND > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user', > >> >> | -- 'connector.topic' = 'user_long', > >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> | 'connector.properties.zookeeper.connect' = > >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.bootstrap.servers' = > >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |INSERT INTO hive_table > >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> DATE_FORMAT(created_time, 'HH') > >> >> |FROM kafka_table > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' > >> >> | > >> >> |""".stripMargin) > >> >> .print() > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-07-13 17:52:54,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >> >> >你把完整的程序再贴下呢 > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <wander...@163.com> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> > >> >> >> > >> >> >> 我现在改成了: > >> >> >> 'sink.partition-commit.delay'='0s' > >> >> >> > >> >> >> > >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, > >> >> >> hive表还是查不到数据 > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-07-13 17:23:34,"夏帅" <jkill...@dingtalk.com> 写道: > >> >> >> > >> >> >> 你好, > >> >> >> 你设置了1个小时的 > >> >> >> SINK_PARTITION_COMMIT_DELAY > >> >> >> > >> >> >> > >> >> >> ------------------------------------------------------------------ > >> >> >> 发件人:Zhou Zach <wander...@163.com> > >> >> >> 发送时间:2020年7月13日(星期一) 17:09 > >> >> >> 收件人:user-zh <user-zh@flink.apache.org> > >> >> >> 主 题:Re:Re: Re: Table options do not contain an option key > 'connector' > >> >> for > >> >> >> discovering a connector. > >> >> >> > >> >> >> > >> >> >> 开了checkpoint, > >> >> >> val streamExecutionEnv = > >> >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> >> > >> >> >> > >> >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> >> >> CheckpointingMode.EXACTLY_ONCE) > >> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > >> 1000) > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >> >> >> >有开checkpoint吧?delay设的多少? > >> >> >> > > >> >> >> >Add partition 在 checkpoint完成 + delay的时间后 > >> >> >> > > >> >> >> >Best, > >> >> >> >Jingsong > >> >> >> > > >> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <wander...@163.com> > >> wrote: > >> >> >> > > >> >> >> >> Hi, > >> >> >> >> > >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> >> >> >> partition到hive表吗,我当前设置了参数 > >> >> >> >> 'sink.partition-commit.policy.kind'='metastore' > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <jingsongl...@gmail.com> > >> >> wrote: > >> >> >> >> >Hi, > >> >> >> >> > > >> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> >> >> >> > > >> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> >> >> >> > > >> >> >> >> >Best, > >> >> >> >> >Jingsong > >> >> >> >> > > >> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <wander...@163.com> > >> >> wrote: > >> >> >> >> > > >> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> >> >> >> 也报错误 > >> >> >> >> >> query: > >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> | > >> >> >> >> >> |CREATE TABLE hive_table ( > >> >> >> >> >> | user_id STRING, > >> >> >> >> >> | age INT > >> >> >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS > parquet > >> >> >> >> >> TBLPROPERTIES ( > >> >> >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > >> >> $hr:00:00', > >> >> >> >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> >> >> >> | 'sink.partition-commit.delay'='1 h', > >> >> >> >> >> | > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> >> |) > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |CREATE TABLE kafka_table ( > >> >> >> >> >> | uid VARCHAR, > >> >> >> >> >> | -- uid BIGINT, > >> >> >> >> >> | sex VARCHAR, > >> >> >> >> >> | age INT, > >> >> >> >> >> | created_time TIMESTAMP(3), > >> >> >> >> >> | WATERMARK FOR created_time as created_time - > INTERVAL > >> '3' > >> >> >> >> SECOND > >> >> >> >> >> |) WITH ( > >> >> >> >> >> | 'connector.type' = 'kafka', > >> >> >> >> >> | 'connector.version' = 'universal', > >> >> >> >> >> | 'connector.topic' = 'user', > >> >> >> >> >> | -- 'connector.topic' = 'user_long', > >> >> >> >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> >> >> >> | 'connector.properties.zookeeper.connect' = > >> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> >> >> >> | 'connector.properties.bootstrap.servers' = > >> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> >> >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> >> >> >> | 'format.type' = 'json', > >> >> >> >> >> | 'format.derive-schema' = 'true' > >> >> >> >> >> |) > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |INSERT INTO hive_table > >> >> >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, > 'yyyy-MM-dd'), > >> >> >> >> >> DATE_FORMAT(created_time, 'HH') > >> >> >> >> >> |FROM kafka_table > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and > hr='13' > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> .print() > >> >> >> >> >> 错误栈: > >> >> >> >> >> Exception in thread "main" > >> >> >> >> org.apache.flink.table.api.ValidationException: > >> >> >> >> >> Unable to create a sink for writing table > >> >> >> >> >> 'default_catalog.default_database.hive_table'. > >> >> >> >> >> > >> >> >> >> >> Table options are: > >> >> >> >> >> > >> >> >> >> >> 'hive.storage.file-format'='parquet' > >> >> >> >> >> 'is_generic'='false' > >> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> >> >> >> 'sink.partition-commit.delay'='1 h' > >> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> >> >> >> at > >> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> >> >> >> at > >> >> >> >> >> > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> >> >> >> at > >> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> >> >> >> at > >> >> >> >> >> > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> >> >> >> at > >> >> >> >> >> > >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> >> >> >> at > >> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: > >> Table > >> >> >> options > >> >> >> >> >> do not contain an option key 'connector' for discovering a > >> >> connector. > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> >> >> >> ... 19 more > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> > > >> >> >> >> >-- > >> >> >> >> >Best, Jingsong Lee > >> >> >> >> > >> >> >> > > >> >> >> > > >> >> >> >-- > >> >> >> >Best, Jingsong Lee > >> >> >> > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee >