flink1.11??Streaming File Sink????
flink1.11??Streaming File Sinkhdfsexactly-once
????
Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?
而如果是连续keyBy,比如.keyBy(xx).keyBy(yy).window()这样keyBy多少此也只最后一个有效,window当然还是只有1个。不会出现多个window的。 yidan zhao 于2021年2月23日周二 下午3:31写道: > 我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。 > 我指的是flatMap和window是分开的算子,不会是1个算子。 > > hdxg1101300123 于2021年2月22日周一 下午11:37写道: > >> >> 为什么flatmap就是2个 >> >> >> 发自vivo智能手机 >> > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 >> > >> > yidan zhao 于2021年2月22日周一 上午10:31写道: >> > >> > > 只有最后一个keyBy有效。 >> > > >> > > Hongyuan Ma 于2021年2月21日周日 下午10:59写道: >> > > >> > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, >> > >> 还是在前一次keyby的基础上生成m*n个窗口? >> > >> >> > >> >> > >> 像下面这样写, 最后的窗口是只按area划分的吗? >> > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 >> > >> stream.keyby("id") >> > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state >> > >> .assignTime() // 修改轨迹eventTime为预测出的时间 >> > >> .keyby("area") >> > >> .window() // 根据区域划分窗口 >> > >> .process() // 统计各个区域内的轨迹 >> > >> >> > >> >> >
Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?
我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。 我指的是flatMap和window是分开的算子,不会是1个算子。 hdxg1101300123 于2021年2月22日周一 下午11:37写道: > > 为什么flatmap就是2个 > > > 发自vivo智能手机 > > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 > > > > yidan zhao 于2021年2月22日周一 上午10:31写道: > > > > > 只有最后一个keyBy有效。 > > > > > > Hongyuan Ma 于2021年2月21日周日 下午10:59写道: > > > > > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, > > >> 还是在前一次keyby的基础上生成m*n个窗口? > > >> > > >> > > >> 像下面这样写, 最后的窗口是只按area划分的吗? > > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 > > >> stream.keyby("id") > > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state > > >> .assignTime() // 修改轨迹eventTime为预测出的时间 > > >> .keyby("area") > > >> .window() // 根据区域划分窗口 > > >> .process() // 统计各个区域内的轨迹 > > >> > > >> >
通过普通ddl来读写hive
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗 现在不支持是有什么考虑吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
大佬们, assignTimestampsAndWatermarks() 支持将eventTime设置为未来的时间吗
我想预测某个轨迹点后续5秒的轨迹, 并设置eventTime为未来的时间 我使用AscendingTimestampExtractor 但是报了 WARN Timestamp monotony violated xxx < yyy // 对于每个轨迹点, 预测输出其后续10秒的点, 比如A车10秒时来了一条, B车15秒时来了一条 stream.flatmap() // 预测出A车11~20秒的轨迹, B车16~25秒时的轨迹 .assignTimestamps(new AscendingTimestampExtractor()) // 设置eventTime为预测计算出的未来的时间 .window(1s) // 收集预测出的第16, 第17秒...第20秒时的A, B两个轨迹点 .process() // 对某一秒时的轨迹点进行两两距离计算, 距离过近就发送一条警报消息
Re: Re: Flink SQL 写入Hive问题请教
是的,hive表必须存在HiveCatalog里才能正常读写 On Tue, Feb 23, 2021 at 10:14 AM yinghua...@163.com wrote: > > Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行? > > > > yinghua...@163.com > > 发件人: Rui Li > 发送时间: 2021-02-23 10:05 > 收件人: user-zh > 主题: Re: Re: Flink SQL 写入Hive问题请教 > 你好, > > 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? > > On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > > > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > > into时创建Hive表时提示没有连接器的配置 > > Table options are: 'is_generic'='false' > > 'partition.time-extractor.timestamp-pattern'='$dt $hr' > > 'sink.partition-commit.delay'='0S' > > 'sink.partition-commit.policy.kind'='metastore,success-file' > > 'sink.partition-commit.trigger'='partition-time' at > > > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > > at > > > com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) > > at > com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) > > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at > > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > ... 11 more Caused by: org.apache.flink.table.api.ValidationException: > > Table options do not contain an option key 'connector' for discovering a > > connector. at > > > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > > at > > > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > > ... 37 more > > > > > > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-02-22 17:12:55,"eriendeng" 写道: > > >你这没有把dialect set成hive吧,走到了else分支。default > > >dialect是需要指定connector的,参考文档的kafka到hive代码 > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > > > > > > > > > >-- > > >Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > Best regards! > Rui Li > -- Best regards! Rui Li
Re: Re: Flink SQL 写入Hive问题请教
Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行? yinghua...@163.com 发件人: Rui Li 发送时间: 2021-02-23 10:05 收件人: user-zh 主题: Re: Re: Flink SQL 写入Hive问题请教 你好, 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > into时创建Hive表时提示没有连接器的配置 > Table options are: 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr' > 'sink.partition-commit.delay'='0S' > 'sink.partition-commit.policy.kind'='metastore,success-file' > 'sink.partition-commit.trigger'='partition-time' at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > at > com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more Caused by: org.apache.flink.table.api.ValidationException: > Table options do not contain an option key 'connector' for discovering a > connector. at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 37 more > > > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? > > > > > > > > > > > > > > > > > > 在 2021-02-22 17:12:55,"eriendeng" 写道: > >你这没有把dialect set成hive吧,走到了else分支。default > >dialect是需要指定connector的,参考文档的kafka到hive代码 > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > > > > > >-- > >Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: Re: Flink SQL 写入Hive问题请教
你好, 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > into时创建Hive表时提示没有连接器的配置 > Table options are: 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr' > 'sink.partition-commit.delay'='0S' > 'sink.partition-commit.policy.kind'='metastore,success-file' > 'sink.partition-commit.trigger'='partition-time' at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > at > com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more Caused by: org.apache.flink.table.api.ValidationException: > Table options do not contain an option key 'connector' for discovering a > connector. at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 37 more > > > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? > > > > > > > > > > > > > > > > > > 在 2021-02-22 17:12:55,"eriendeng" 写道: > >你这没有把dialect set成hive吧,走到了else分支。default > >dialect是需要指定connector的,参考文档的kafka到hive代码 > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > > > > > >-- > >Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li
Re: Re:Re: Flink SQL 写入Hive问题请教
在hive catalog下创建kafka source表会在hive metastore中创建一张仅包含元数据的表,hive不可查,flink任务中可以识别并当成hive表读,然后只需要在hive dialect下正常读出写入即可。 参考 https://my.oschina.net/u/2828172/blog/4415970 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Flink SQL 写入Hive问题请教
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert into时创建Hive表时提示没有连接器的配置 Table options are: 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr' 'sink.partition-commit.delay'='0S' 'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.partition-commit.trigger'='partition-time' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) at com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 37 more 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理? 在 2021-02-22 17:12:55,"eriendeng" 写道: >你这没有把dialect set成hive吧,走到了else分支。default >dialect是需要指定connector的,参考文档的kafka到hive代码 >https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
回复: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?
为什么flatmap就是2个 发自vivo智能手机 > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 > > yidan zhao 于2021年2月22日周一 上午10:31写道: > > > 只有最后一个keyBy有效。 > > > > Hongyuan Ma 于2021年2月21日周日 下午10:59写道: > > > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, > >> 还是在前一次keyby的基础上生成m*n个窗口? > >> > >> > >> 像下面这样写, 最后的窗口是只按area划分的吗? > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 > >> stream.keyby("id") > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state > >> .assignTime() // 修改轨迹eventTime为预测出的时间 > >> .keyby("area") > >> .window() // 根据区域划分窗口 > >> .process() // 统计各个区域内的轨迹 > >> > >>
Re: Flink standalone模式如何区分各个任务的日志?
Flink的standalone application模式[1]是可以每个app都单独记录日志的 [1]. https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster Best, Yang xingoo <23603...@qq.com> 于2021年2月22日周一 下午12:01写道: > Hi, > > 这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql 写入clickhouse性能优化
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?
Re: Flink SQL 写入Hive问题请教
你这没有把dialect set成hive吧,走到了else分支。default dialect是需要指定connector的,参考文档的kafka到hive代码 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing -- Sent from: http://apache-flink.147419.n8.nabble.com/