flink new source api, kafka部分对kafka-client的版本要求。
如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的 consumer.committed(partitionsStoppingAtCommitted) .forEach( (tp, offsetAndMetadata) -> { Preconditions.checkNotNull( offsetAndMetadata, String.format( "Partition %s should stop at committed offset. " + "But there is no committed offset of this partition for group %s", tp, groupId)); stoppingOffsets.put(tp, offsetAndMetadata.offset()); }); 在kafka-client2.2.0中是不符合语法的。committed方法签名不同。
Re:Re: Re: Re: 公司数据密文,实现group by和join
您好! 不可以,因为这种加密算法有点特别, 比如id是密文的话,相同id的密文也不一样的,不管你这个密文的明文是不是一样,它的密文形成的字符串肯定不一样; 谢谢. 在 2021-11-11 12:43:48,"yidan zhao" 写道: >如果是group by id,不论id是明文还是密文,相同id的密文肯定也一样,直接group by 密文id不可以吗? > >godfrey he 于2021年11月1日周一 下午3:22写道: > >> 上传的图片没法显示,通过图床工具或纯文本方式重新发一遍 >> >> lyh1067341434 于2021年11月1日周一 上午10:42写道: >> >> > 您好! >> > >> > 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组; >> > 为了更清楚表达,下面为图示: >> > >> > 谢谢您! >> > >> > >> > >> > >> > >> > >> > >> > 在 2021-10-29 10:49:35,"Caizhi Weng" 写道: >> > >Hi! >> > > >> > >你是不是想写这样的 SQL: >> > > >> > >SELECT id, sum(price) AS total_price FROM ( >> > > SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 >> ON >> > >decrypt_udf(T1.id, T2.id) = 1 >> > >) GROUP BY id >> > > >> > >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。 >> > > >> > >lyh1067341434 于2021年10月29日周五 上午9:41写道: >> > > >> > >> 您好! >> > >> >> > >> >> > >> 感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了; >> > >> >> > >> >> > >> 比如 有一张表 a, 表a有 >> > >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA"); >> > >> >> > >> >> > >> 如果我想求 不同id组的price之和: >> > >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确; >> > >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确; >> > >> >> > >> >> > >> >> > >> >> > >> 问题: >> > >> >> > >> >> > >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确; >> > >>我看到api只能指定分组的key是哪一个; >> > >> >> > >> >> > >> 谢谢您! >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> 在 2021-10-28 11:09:26,"Caizhi Weng" 写道: >> > >> >Hi! >> > >> > >> > >> >没太明白你的需求。你的需求是不是 >> > >> > >> > >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。 >> > >> > >> > >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。 >> > >> > >> > >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。 >> > >> > >> > >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by >> > >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。 >> > >> > >> > >> >[1] >> > >> > >> > >> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/ >> > >> > >> > >> >lyh1067341...@163.com 于2021年10月27日周三 >> 下午5:20写道: >> > >> > >> > >> >> 您好: >> > >> >> >> > >> >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group >> > >> >> by,对于使用spark和flink算子不知道如何实现。 >> > >> >> >> > >> >> 问题: >> > >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口) >> > >> >> >> > >> >> 谢谢您。 >> > >> >> >> > >> >> >> > >> >> >> > >> >> 发自 网易邮箱大师 >> > >> >> > >> > >> > >> > >> > >>
Re: 回复:Re:回复: flink sql消费kafka各分区消息不均衡问题
不清楚你说的“作业”是啥,作业多,作业少,你是多个作业吗? 我感觉你是讲subtask数多少估计,如果TM的压力完全是由于flink导致,那应该就是你slot分配在TM不均衡导致。 考虑设置 cluster.evenly-spread-out-slots: true 试试。 casel.chen 于2021年11月1日周一 上午10:48写道: > 写入数据看过是均衡的,没有问题。消费端位点差别挺大,积压情况大部分分区都很小,少数个别分区积压很大,达到数十万级别。跟TM负载有关吗? > > > > > > > > > > > > > > > > > > 在 2021-10-31 10:13:11,"悟空" 写道: > >hi > >那你就需要排查下上有写入方是什么策略写入 或者通过Kafka 查看下 每个分区数据量是否明显不均衡。感觉可能和写入方的原因很大 > > > > > > > >发自我的iPhone > > > > > >-- 原始邮件 -- > >发件人: casel.chen >发送时间: 2021年10月30日 18:19 > >收件人: user-zh >主题: 回复:Re:回复: flink sql消费kafka各分区消息不均衡问题 > > > > > > > > >kafka是作为flink作业source来消费的,作业跑在k8s上以sessionmode运行,发现有的TM作业特别多,消耗资源也多,而有的TM作业少,占用资源也少。 > >会不会是这个原因造成kafka不同分区消费能力差异?那些消费慢的task恰好落在比较忙的TM。有什么办法可以让各个TM负载均衡吗? > > > > > > > > > > > > > > > > > >在2021-10-2917:32:40,"WuKong" 写道: > >Hicasel.chan: > >请问你是sink端数据不均衡还是source端数据不均衡。 > > >如果是写入端,看看你是否自定义了分区字段,flink默认是策略应该不会造成数据不均衡,但是无法保证分区有序性。同时也可以关注下下游消费者是否会有消费不同分区处理性能不同问题。 > > > > > > > >--- > >Best, > >WuKong > > > >发件人:casel.chen > >发送时间:2021-10-2909:30 > >收件人:user-zh@flink.apache.org > >主题:flinksql消费kafka各分区消息不均衡问题 > > >flinksql消费kafka消息做数据同步,前期没有出现堆积不均的问题,这两天发现某些kafka分区积压特别多,会是什么原因造成的?怎样解决呢?从统计结果上看,消息还算均匀地打到各个kafka分区上。作业没有开窗和聚合,只是攒一批写一批这样子的。注:作业是跑在k8s上的 > > > > > > >|分区ID|客户端|最大位点|消费位点|堆积量| > > >|0|n/a|155,397,108|155,396,747|361| > > >|1|n/a|155,215,444|155,215,108|336| > > >|2|n/a|155,369,596|155,369,258|338| > > >|3|n/a|155,422,750|155,422,337|413| > > >|4|n/a|155,163,343|154,489,738|673,605| > > >|5|n/a|155,401,388|154,702,173|699,215| > > >|6|n/a|155,372,040|154,651,398|720,642| > > >|7|n/a|155,208,461|154,528,301|680,160| > > >|8|n/a|155,383,486|154,696,404|687,082| > > >|9|n/a|155,391,068|154,668,426|722,642| > > >|10|n/a|155,139,417|154,450,377|689,040| > > >|11|n/a|155,411,848|155,411,518|330| > > >
Re: Re: Re: 公司数据密文,实现group by和join
如果是group by id,不论id是明文还是密文,相同id的密文肯定也一样,直接group by 密文id不可以吗? godfrey he 于2021年11月1日周一 下午3:22写道: > 上传的图片没法显示,通过图床工具或纯文本方式重新发一遍 > > lyh1067341434 于2021年11月1日周一 上午10:42写道: > > > 您好! > > > > 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组; > > 为了更清楚表达,下面为图示: > > > > 谢谢您! > > > > > > > > > > > > > > > > 在 2021-10-29 10:49:35,"Caizhi Weng" 写道: > > >Hi! > > > > > >你是不是想写这样的 SQL: > > > > > >SELECT id, sum(price) AS total_price FROM ( > > > SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 > ON > > >decrypt_udf(T1.id, T2.id) = 1 > > >) GROUP BY id > > > > > >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。 > > > > > >lyh1067341434 于2021年10月29日周五 上午9:41写道: > > > > > >> 您好! > > >> > > >> > > >> 感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了; > > >> > > >> > > >> 比如 有一张表 a, 表a有 > > >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA"); > > >> > > >> > > >> 如果我想求 不同id组的price之和: > > >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确; > > >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确; > > >> > > >> > > >> > > >> > > >> 问题: > > >> > > >> > > >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确; > > >>我看到api只能指定分组的key是哪一个; > > >> > > >> > > >> 谢谢您! > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> 在 2021-10-28 11:09:26,"Caizhi Weng" 写道: > > >> >Hi! > > >> > > > >> >没太明白你的需求。你的需求是不是 > > >> > > > >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。 > > >> > > > >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。 > > >> > > > >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。 > > >> > > > >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by > > >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。 > > >> > > > >> >[1] > > >> > > > >> > https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/ > > >> > > > >> >lyh1067341...@163.com 于2021年10月27日周三 > 下午5:20写道: > > >> > > > >> >> 您好: > > >> >> > > >> > 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group > > >> >> by,对于使用spark和flink算子不知道如何实现。 > > >> >> > > >> >> 问题: > > >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口) > > >> >> > > >> >> 谢谢您。 > > >> >> > > >> >> > > >> >> > > >> >> 发自 网易邮箱大师 > > >> > > > > > > > > > > >
flinksql 写 hive ,orc格式,应该支持下压缩。
如题,有支持压缩的方法吗当前,看文档没找到应该。
Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。
Thanks! +1 to pattern Best, Jingsong On Wed, Nov 10, 2021 at 7:52 PM yidan zhao wrote: > > 我在jira回复了下,我感觉还是能配置化好一些,那个liwei貌似现在加了个basicDate这个太单一了。 > > Jingsong Li 于2021年11月4日周四 下午12:18写道: > > > 你可以自定义个partition.time-extractor.class来自己解析 > > > > Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。 > > 建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758 > > > > Best, > > Jingsong > > > > On Thu, Nov 4, 2021 at 11:47 AM yidan zhao wrote: > > > > > > 如题,我当前是select date_format(xxx, 'MMdd') as dt... > > > > > > partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。 > > > > > > 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。 > > > > > > > > -- > > Best, Jingsong Lee > > -- Best, Jingsong Lee
Re: flink广播流
合理做法是open中把最初一波配置流加载好,然后广播流只是增量部分数据。 Yuepeng Pan 于2021年11月8日周一 上午10:11写道: > > > > Hi, 俊超. > 如果你指的是数据流必须在接受到一个或者多个ddl数据流才能够继续解析的话,那么你可以在ddl流到达算子之前,将数据流存入liststate,当接收到ddl类型的数据流元素后,先解析或处理 > liststate中的数据,而后继续处理当前与后续的来自数据流的元素。 > 也可以使用上述方式达到 ‘使用广播流的方式来提前加载mysql表结构的变化’ 的逻辑效果。 > >[1]. > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/ >[2]. > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/broadcast_state/ > > > 祝好。 > > > Best, > Roc > > > > > 在 2021-11-08 09:44:52,"程俊超" 写道: > > >您好,我想使用广播流的方式来提前加载mysql表结构的变化(ddl),但是会遇到广播流没有数据流到达速度快的情况,导致问题。网上说可以使用liststate来解决这种情况,但是具体应该如何使用呢 > > > > > >| | > >程俊超 > >| > >| > >邮箱:c_18641943...@163.com > >| > > > >签名由 网易邮箱大师 定制 >
Re: MongoDB sink
Hi, 具体问题建议直接在相关ticket上进行讨论,邮件列表上可能相关人士没有注意到。 祝好 唐云 From: 不许人间见白头 Sent: Wednesday, November 10, 2021 22:28 To: user-zh Subject: MongoDB sink 你好, 请问一下,关于New feature: FLINK-24477 预计什么时候创建PR呢?
Re:Re:Re: flink 1.10 sql 读写 hive 2.1.0
您好! 再次补充下:报错的那一行代码是TableEnvironment tableEnv = TableEnvironment.create(settings); 直接运行的话,会报错; Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. The following properties are requested: class-name=org.apache.flink.table.planner.delegation.BlinkExecutorFactory streaming-mode=false The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.module.CoreModuleFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:238) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:185) at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:171) at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:203) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:11) 然后加了blink的依赖的包; 就出现了这样报错: Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) 在 2021-11-11 10:16:40,"liuyehan" 写 不好意思,这个是全部的报错; Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13) 在 2021-11-11 10:06:06,"yidan zhao" 写道: >这报错信息没几句,光代码看不出来啥的。 > >liuyehan 于2021年11月11日周四 上午9:54写道: > >> 您好! >> >> >> 感谢您百忙之中抽空看我邮件; >> 目前问题: >> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" >> java.lang.IncompatibleClassChangeError: Implementing class >> at java.lang.ClassLoader.defineClass1(Native Method) >> at java.lang.ClassLoader.defineClass(ClassLoader.java:763) >> >> >> pom.xml: >> >> org.apache.flink >> flink-connector-hive_2.11 >> 1.10.2 >> provided >> >> >> >> org.apache.flink >> flink-table-api-java-bridge_2.11 >> 1.10.2 >> provided >> >> >> >> >> org.apache.hive >> hive-exec
Re:Re: flink 1.10 sql 读写 hive 2.1.0
不好意思,这个是全部的报错; Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13) 在 2021-11-11 10:06:06,"yidan zhao" 写道: >这报错信息没几句,光代码看不出来啥的。 > >liuyehan 于2021年11月11日周四 上午9:54写道: > >> 您好! >> >> >> 感谢您百忙之中抽空看我邮件; >> 目前问题: >> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" >> java.lang.IncompatibleClassChangeError: Implementing class >> at java.lang.ClassLoader.defineClass1(Native Method) >> at java.lang.ClassLoader.defineClass(ClassLoader.java:763) >> >> >> pom.xml: >> >> org.apache.flink >> flink-connector-hive_2.11 >> 1.10.2 >> provided >> >> >> >> org.apache.flink >> flink-table-api-java-bridge_2.11 >> 1.10.2 >> provided >> >> >> >> >> org.apache.hive >> hive-exec >> 2.1.0 >> provided >> >> >> >> 代码: >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> TableEnvironment tableEnv = TableEnvironment.create(settings); >> >> //EnvironmentSettings settings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> >>// TableEnvironment tableEnv = TableEnvironment.create(settings); >> >> String name= "myhive"; >> String defaultDatabase = "default"; >> String hiveConfDir = "/export/server/hive-2.1.0/conf"; >> >> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, >> hiveConfDir,"2.1.0"); >> tableEnv.registerCatalog("myhive", hive); >> >> // set the HiveCatalog as the current catalog of the session >> tableEnv.useCatalog("myhive"); >> Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit >> 10"); >> result.printSchema(); >> >> tableEnv.execute("Flink SQL"); >> >> >> >> >> >> >> >> >>
Re: flink 1.10 sql 读写 hive 2.1.0
这报错信息没几句,光代码看不出来啥的。 liuyehan 于2021年11月11日周四 上午9:54写道: > 您好! > > > 感谢您百忙之中抽空看我邮件; > 目前问题: > 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" > java.lang.IncompatibleClassChangeError: Implementing class > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > > > pom.xml: > > org.apache.flink > flink-connector-hive_2.11 > 1.10.2 > provided > > > > org.apache.flink > flink-table-api-java-bridge_2.11 > 1.10.2 > provided > > > > > org.apache.hive > hive-exec > 2.1.0 > provided > > > > 代码: > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > > //EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > >// TableEnvironment tableEnv = TableEnvironment.create(settings); > > String name= "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = "/export/server/hive-2.1.0/conf"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > hiveConfDir,"2.1.0"); > tableEnv.registerCatalog("myhive", hive); > > // set the HiveCatalog as the current catalog of the session > tableEnv.useCatalog("myhive"); > Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit > 10"); > result.printSchema(); > > tableEnv.execute("Flink SQL"); > > > > > > > > >
flink 1.10 sql 读写 hive 2.1.0
您好! 感谢您百忙之中抽空看我邮件; 目前问题: 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) pom.xml: org.apache.flink flink-connector-hive_2.11 1.10.2 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.2 provided org.apache.hive hive-exec 2.1.0 provided 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); // TableEnvironment tableEnv = TableEnvironment.create(settings); String name= "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/export/server/hive-2.1.0/conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0"); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10"); result.printSchema(); tableEnv.execute("Flink SQL");
flink 1.10 sql 读写 hive 2.1.0
您好! 感谢您百忙之中抽空看我邮件; 目前问题: 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) pom.xml: org.apache.flink flink-connector-hive_2.11 1.10.2 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.2 provided org.apache.hive hive-exec 2.1.0 provided 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); // TableEnvironment tableEnv = TableEnvironment.create(settings); String name= "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/export/server/hive-2.1.0/conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0"); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10"); result.printSchema(); tableEnv.execute("Flink SQL");
MongoDB sink
你好, 请问一下,关于New feature: FLINK-24477 预计什么时候创建PR呢?
Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。
我在jira回复了下,我感觉还是能配置化好一些,那个liwei貌似现在加了个basicDate这个太单一了。 Jingsong Li 于2021年11月4日周四 下午12:18写道: > 你可以自定义个partition.time-extractor.class来自己解析 > > Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。 > 建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758 > > Best, > Jingsong > > On Thu, Nov 4, 2021 at 11:47 AM yidan zhao wrote: > > > > 如题,我当前是select date_format(xxx, 'MMdd') as dt... > > > > partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。 > > > > 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。 > > > > -- > Best, Jingsong Lee >