回复:flink远程调用时环境变量问题
您好: 可以尝试下在/etc/profile文件里配置 export HADOOP_CLASS PATH=`hadoop classpath` 我是这样解决的。 | | lyh1067341434 | | 邮箱:lyh1067341...@163.com | 发自 网易邮箱大师 回复的原邮件 | 发件人 | 王健<13166339...@163.com> | | 日期 | 2021年11月26日 10:59 | | 收件人 | user-zh | | 抄送至 | | | 主题 | flink远程调用时环境变量问题 | 大佬们: 远程调用flink启动任务,如何解决hadoop的环境变量问题呢,像java,hbase其他的环境变量都可以通过在flink-conf.yaml配置文件里配置,但是hadoop配置env.hadoop.conf.dir不起作用。 可能是需要增加export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`,但是这个是不能在flink-conf.yaml配置吧 急求解决,万分感谢 | | 王健 | | 13166339...@163.com | 签名由网易邮箱大师定制
Re:Re: Re: Re: 公司数据密文,实现group by和join
您好! 不可以,因为这种加密算法有点特别, 比如id是密文的话,相同id的密文也不一样的,不管你这个密文的明文是不是一样,它的密文形成的字符串肯定不一样; 谢谢. 在 2021-11-11 12:43:48,"yidan zhao" 写道: >如果是group by id,不论id是明文还是密文,相同id的密文肯定也一样,直接group by 密文id不可以吗? > >godfrey he 于2021年11月1日周一 下午3:22写道: > >> 上传的图片没法显示,通过图床工具或纯文本方式重新发一遍 >> >> lyh1067341434 于2021年11月1日周一 上午10:42写道: >> >> > 您好! >> > >> > 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组; >> > 为了更清楚表达,下面为图示: >> > >> > 谢谢您! >> > >> > >> > >> > >> > >> > >> > >> > 在 2021-10-29 10:49:35,"Caizhi Weng" 写道: >> > >Hi! >> > > >> > >你是不是想写这样的 SQL: >> > > >> > >SELECT id, sum(price) AS total_price FROM ( >> > > SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 >> ON >> > >decrypt_udf(T1.id, T2.id) = 1 >> > >) GROUP BY id >> > > >> > >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。 >> > > >> > >lyh1067341434 于2021年10月29日周五 上午9:41写道: >> > > >> > >> 您好! >> > >> >> > >> >> > >> 感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了; >> > >> >> > >> >> > >> 比如 有一张表 a, 表a有 >> > >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA"); >> > >> >> > >> >> > >> 如果我想求 不同id组的price之和: >> > >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确; >> > >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确; >> > >> >> > >> >> > >> >> > >> >> > >> 问题: >> > >> >> > >> >> > >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确; >> > >>我看到api只能指定分组的key是哪一个; >> > >> >> > >> >> > >> 谢谢您! >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> 在 2021-10-28 11:09:26,"Caizhi Weng" 写道: >> > >> >Hi! >> > >> > >> > >> >没太明白你的需求。你的需求是不是 >> > >> > >> > >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。 >> > >> > >> > >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。 >> > >> > >> > >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。 >> > >> > >> > >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by >> > >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。 >> > >> > >> > >> >[1] >> > >> > >> > >> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/ >> > >> > >> > >> >lyh1067341...@163.com 于2021年10月27日周三 >> 下午5:20写道: >> > >> > >> > >> >> 您好: >> > >> >> >> > >> >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group >> > >> >> by,对于使用spark和flink算子不知道如何实现。 >> > >> >> >> > >> >> 问题: >> > >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口) >> > >> >> >> > >> >> 谢谢您。 >> > >> >> >> > >> >> >> > >> >> >> > >> >> 发自 网易邮箱大师 >> > >> >> > >> > >> > >> > >> > >>
Re:Re:Re: flink 1.10 sql 读写 hive 2.1.0
您好! 再次补充下:报错的那一行代码是TableEnvironment tableEnv = TableEnvironment.create(settings); 直接运行的话,会报错; Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. The following properties are requested: class-name=org.apache.flink.table.planner.delegation.BlinkExecutorFactory streaming-mode=false The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.module.CoreModuleFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:238) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:185) at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:171) at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:203) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:11) 然后加了blink的依赖的包; 就出现了这样报错: Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) 在 2021-11-11 10:16:40,"liuyehan" 写 不好意思,这个是全部的报错; Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13) 在 2021-11-11 10:06:06,"yidan zhao" 写道: >这报错信息没几句,光代码看不出来啥的。 > >liuyehan 于2021年11月11日周四 上午9:54写道: > >> 您好! >> >> >> 感谢您百忙之中抽空看我邮件; >> 目前问题: >> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" >> java.lang.IncompatibleClassChangeError: Implementing class >> at java.lang.ClassLoader.defineClass1(Native Method) >> at java.lang.ClassLoader.defineClass(ClassLoader.java:763) >> >> >> pom.xml: >> >> org.apache.flink >> flink-connector-hive_2.11 >> 1.10.2 >>
Re:Re: flink 1.10 sql 读写 hive 2.1.0
不好意思,这个是全部的报错; Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47) at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13) 在 2021-11-11 10:06:06,"yidan zhao" 写道: >这报错信息没几句,光代码看不出来啥的。 > >liuyehan 于2021年11月11日周四 上午9:54写道: > >> 您好! >> >> >> 感谢您百忙之中抽空看我邮件; >> 目前问题: >> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" >> java.lang.IncompatibleClassChangeError: Implementing class >> at java.lang.ClassLoader.defineClass1(Native Method) >> at java.lang.ClassLoader.defineClass(ClassLoader.java:763) >> >> >> pom.xml: >> >> org.apache.flink >> flink-connector-hive_2.11 >> 1.10.2 >> provided >> >> >> >> org.apache.flink >> flink-table-api-java-bridge_2.11 >> 1.10.2 >> provided >> >> >> >> >> org.apache.hive >> hive-exec >> 2.1.0 >> provided >> >> >> >> 代码: >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> TableEnvironment tableEnv = TableEnvironment.create(settings); >> >> //EnvironmentSettings settings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> >>// TableEnvironment tableEnv = TableEnvironment.create(settings); >> >> String name= "myhive"; >> String defaultDatabase = "default"; >> String hiveConfDir = "/export/server/hive-2.1.0/conf"; >> >> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, >> hiveConfDir,"2.1.0"); >> tableEnv.registerCatalog("myhive", hive); >> >> // set the HiveCatalog as the current catalog of the session >> tableEnv.useCatalog("myhive"); >> Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit >> 10"); >> result.printSchema(); >> >> tableEnv.execute("Flink SQL"); >> >> >> >> >> >> >> >> >>
flink 1.10 sql 读写 hive 2.1.0
您好! 感谢您百忙之中抽空看我邮件; 目前问题: 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) pom.xml: org.apache.flink flink-connector-hive_2.11 1.10.2 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.2 provided org.apache.hive hive-exec 2.1.0 provided 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); // TableEnvironment tableEnv = TableEnvironment.create(settings); String name= "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/export/server/hive-2.1.0/conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0"); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10"); result.printSchema(); tableEnv.execute("Flink SQL");
flink 1.10 sql 读写 hive 2.1.0
您好! 感谢您百忙之中抽空看我邮件; 目前问题: 使用看flink 1.10官网 hive部分,出现了Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) pom.xml: org.apache.flink flink-connector-hive_2.11 1.10.2 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.2 provided org.apache.hive hive-exec 2.1.0 provided 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); // TableEnvironment tableEnv = TableEnvironment.create(settings); String name= "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/export/server/hive-2.1.0/conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0"); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10"); result.printSchema(); tableEnv.execute("Flink SQL");