回复:flink远程调用时环境变量问题

2021-11-25 文章 liuyehan
您好:
可以尝试下在/etc/profile文件里配置
export HADOOP_CLASS PATH=`hadoop classpath`
我是这样解决的。



| |
lyh1067341434
|
|
邮箱:lyh1067341...@163.com
|
发自 网易邮箱大师




 回复的原邮件 
| 发件人 | 王健<13166339...@163.com> |
| 日期 | 2021年11月26日 10:59 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | flink远程调用时环境变量问题 |


大佬们:

远程调用flink启动任务,如何解决hadoop的环境变量问题呢,像java,hbase其他的环境变量都可以通过在flink-conf.yaml配置文件里配置,但是hadoop配置env.hadoop.conf.dir不起作用。
   可能是需要增加export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop 
classpath`,但是这个是不能在flink-conf.yaml配置吧


   急求解决,万分感谢



| |
王健
|
|
13166339...@163.com
|
签名由网易邮箱大师定制

Re:Re: Re: Re: 公司数据密文,实现group by和join

2021-11-10 文章 liuyehan
您好!


 不可以,因为这种加密算法有点特别,
比如id是密文的话,相同id的密文也不一样的,不管你这个密文的明文是不是一样,它的密文形成的字符串肯定不一样;
谢谢.

















在 2021-11-11 12:43:48,"yidan zhao"  写道:
>如果是group by id,不论id是明文还是密文,相同id的密文肯定也一样,直接group by 密文id不可以吗?
>
>godfrey he  于2021年11月1日周一 下午3:22写道:
>
>> 上传的图片没法显示,通过图床工具或纯文本方式重新发一遍
>>
>> lyh1067341434  于2021年11月1日周一 上午10:42写道:
>>
>> > 您好!
>> >
>> > 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组;
>> > 为了更清楚表达,下面为图示:
>> >
>> > 谢谢您!
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2021-10-29 10:49:35,"Caizhi Weng"  写道:
>> > >Hi!
>> > >
>> > >你是不是想写这样的 SQL:
>> > >
>> > >SELECT id, sum(price) AS total_price FROM (
>> > >  SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2
>> ON
>> > >decrypt_udf(T1.id, T2.id) = 1
>> > >) GROUP BY id
>> > >
>> > >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。
>> > >
>> > >lyh1067341434  于2021年10月29日周五 上午9:41写道:
>> > >
>> > >> 您好!
>> > >>
>> > >>
>> > >>   感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了;
>> > >>
>> > >>
>> > >> 比如 有一张表 a, 表a有
>> > >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA");
>> > >>
>> > >>
>> > >> 如果我想求 不同id组的price之和:
>> > >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确;
>> > >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确;
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> 问题:
>> > >>
>> > >>
>> > >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确;
>> > >>我看到api只能指定分组的key是哪一个;
>> > >>
>> > >>
>> > >> 谢谢您!
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> 在 2021-10-28 11:09:26,"Caizhi Weng"  写道:
>> > >> >Hi!
>> > >> >
>> > >> >没太明白你的需求。你的需求是不是
>> > >> >
>> > >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。
>> > >> >
>> > >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。
>> > >> >
>> > >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。
>> > >> >
>> > >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
>> > >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。
>> > >> >
>> > >> >[1]
>> > >> >
>> > >>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/
>> > >> >
>> > >> >lyh1067341...@163.com  于2021年10月27日周三
>> 下午5:20写道:
>> > >> >
>> > >> >> 您好:
>> > >> >>
>> > >>
>> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
>> > >> >> by,对于使用spark和flink算子不知道如何实现。
>> > >> >>
>> > >> >> 问题:
>> > >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
>> > >> >>
>> > >> >> 谢谢您。
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> 发自 网易邮箱大师
>> > >>
>> >
>> >
>> >
>> >
>> >
>>


Re:Re:Re: flink 1.10 sql 读写 hive 2.1.0

2021-11-10 文章 liuyehan
您好!


再次补充下:报错的那一行代码是TableEnvironment tableEnv = TableEnvironment.create(settings);


直接运行的话,会报错;
Exception in thread "main" 
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' 
in the classpath. Reason: No factory implements 
'org.apache.flink.table.delegation.ExecutorFactory'. The following properties 
are requested: 
class-name=org.apache.flink.table.planner.delegation.BlinkExecutorFactory 
streaming-mode=false The following factories have been considered: 
org.apache.flink.table.sources.CsvBatchTableSourceFactory 
org.apache.flink.table.sources.CsvAppendTableSourceFactory 
org.apache.flink.table.sinks.CsvBatchTableSinkFactory 
org.apache.flink.table.sinks.CsvAppendTableSinkFactory 
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory 
org.apache.flink.table.module.CoreModuleFactory at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:238)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:185)
 at 
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:171)
 at 
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
 at 
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:203)
 at 
org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87) at 
hive.Flink_Hive_Test.main(Flink_Hive_Test.java:11) 然后加了blink的依赖的包;



就出现了这样报错:
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47)
at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87)
















在 2021-11-11 10:16:40,"liuyehan"  写

不好意思,这个是全部的报错;
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47)
at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87)
at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13)

















在 2021-11-11 10:06:06,"yidan zhao"  写道:
>这报错信息没几句,光代码看不出来啥的。
>
>liuyehan  于2021年11月11日周四 上午9:54写道:
>
>> 您好!
>>
>>
>> 感谢您百忙之中抽空看我邮件;
>> 目前问题:
>> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main"
>> java.lang.IncompatibleClassChangeError: Implementing class
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>>
>> pom.xml:
>> 
>> org.apache.flink
>> flink-connector-hive_2.11
>> 1.10.2
>>

Re:Re: flink 1.10 sql 读写 hive 2.1.0

2021-11-10 文章 liuyehan
不好意思,这个是全部的报错;
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.(BatchPlanner.scala:47)
at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:52)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:208)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:87)
at hive.Flink_Hive_Test.main(Flink_Hive_Test.java:13)

















在 2021-11-11 10:06:06,"yidan zhao"  写道:
>这报错信息没几句,光代码看不出来啥的。
>
>liuyehan  于2021年11月11日周四 上午9:54写道:
>
>> 您好!
>>
>>
>> 感谢您百忙之中抽空看我邮件;
>> 目前问题:
>> 使用看flink 1.10官网 hive部分,出现了Exception in thread "main"
>> java.lang.IncompatibleClassChangeError: Implementing class
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>>
>> pom.xml:
>> 
>> org.apache.flink
>> flink-connector-hive_2.11
>> 1.10.2
>> provided
>> 
>>
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.11
>> 1.10.2
>> provided
>> 
>>
>> 
>> 
>> org.apache.hive
>> hive-exec
>> 2.1.0
>> provided
>> 
>>
>>
>> 代码:
>>  EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>
>> //EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>
>>// TableEnvironment tableEnv = TableEnvironment.create(settings);
>>
>> String name= "myhive";
>> String defaultDatabase = "default";
>> String hiveConfDir = "/export/server/hive-2.1.0/conf";
>>
>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
>> hiveConfDir,"2.1.0");
>> tableEnv.registerCatalog("myhive", hive);
>>
>> // set the HiveCatalog as the current catalog of the session
>> tableEnv.useCatalog("myhive");
>> Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit
>> 10");
>> result.printSchema();
>>
>> tableEnv.execute("Flink SQL");
>>
>>
>>
>>
>>
>>
>>
>>
>>


flink 1.10 sql 读写 hive 2.1.0

2021-11-10 文章 liuyehan
您好!


感谢您百忙之中抽空看我邮件;
目前问题:
使用看flink 1.10官网 hive部分,出现了Exception in thread "main" 
java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)


pom.xml:

org.apache.flink
flink-connector-hive_2.11
1.10.2
provided



org.apache.flink
flink-table-api-java-bridge_2.11
1.10.2
provided




org.apache.hive
hive-exec
2.1.0
provided



代码:
 EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

//EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

   // TableEnvironment tableEnv = TableEnvironment.create(settings);

String name= "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/export/server/hive-2.1.0/conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0");
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10");
result.printSchema();

tableEnv.execute("Flink SQL");








 

flink 1.10 sql 读写 hive 2.1.0

2021-11-10 文章 liuyehan
您好!


感谢您百忙之中抽空看我邮件;
目前问题:
使用看flink 1.10官网 hive部分,出现了Exception in thread "main" 
java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)


pom.xml:

org.apache.flink
flink-connector-hive_2.11
1.10.2
provided



org.apache.flink
flink-table-api-java-bridge_2.11
1.10.2
provided




org.apache.hive
hive-exec
2.1.0
provided



代码:
 EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

//EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

   // TableEnvironment tableEnv = TableEnvironment.create(settings);

String name= "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/export/server/hive-2.1.0/conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,"2.1.0");
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
Table result = tableEnv.sqlQuery("select * from exp_2_mysql_table limit 10");
result.printSchema();

tableEnv.execute("Flink SQL");