回复: 关于flink1.10.0使用HiveModule的问题

2020-05-08 文章 like
Hi, Jingsong Lee

非常感谢你的回答!

在不使用正则的情况下我用REGEXP替换REGEXP_REPLACE,暂时解决了该问题,
后续会按你提供的方案进行解决,还请不吝赐教,多谢!






在2020年5月8日 18:14,Jingsong Li 写道:
Hi,

本来应该使用Flink内置的函数的,但是目前由于有bug[1],使用到了HiveModule。
一个简单的办法是实现一个你的Module,继承自HiveModule,overwrite
getFunctionDefinition方法,REGEXP_REPLACE时返回empty。

[1]https://issues.apache.org/jira/browse/FLINK-15595

Best,
Jingsong Lee

On Fri, May 8, 2020 at 5:19 PM like  wrote:


最初因为使用了hive中自定义的UDF,所以我注册了hive的catalog,后面又要用到hive的内置函数,所以又用了HiveModule,代码如下:


val hive = new HiveCatalog("hive", "default", "/etc/hive_config", "1.2.1")
tableEnv.registerCatalog("hive", hive)
tableEnv.useCatalog("hive")
tableEnv.useDatabase("default")
tableEnv.loadModule("myhive", new HiveModule("1.2.1"))




在2020年5月8日 16:30,Jingsong Li 写道:
Hi,

你是怎么用HiveModule的?还保留了CoreModule吗?

Best,
Jingsong Lee

On Fri, May 8, 2020 at 4:14 PM like  wrote:

各位大佬好,
目前我在使用HiveModule的过程中碰到了一些问题,在未使用HiveModule的时候用了REGEXP_REPLACE
函数,这是可以正常使用的,在使用HiveModule之后却出现了问题,当我把字符替换为空的时候就会报错
REGEXP_REPLACE('abcd', 'a', ''),大家有碰到这个问题吗?或者我能选择使用flink或者hive的内置函数吗?




异常堆栈信息如下:


org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: SQL validation failed.
java.lang.reflect.InvocationTargetException
at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at

org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$38/1264413185.call(Unknown
Source)
at

org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/1243806178.run(Unknown
Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at

org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. java.lang.reflect.InvocationTargetException
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org

$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at

org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at

org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at

org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
at

com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine$$$Lambda$765/1756039478.apply(Unknown
Source)
at scala.collection.immutable.List.foreach(List.scala:388)
at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 13 more
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at

org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at

org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at

org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction$$Lambda$780/1747631271.inferReturnType(Unknown
Source)
at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at

org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at

org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall

回复: 关于flink1.10.0使用HiveModule的问题

2020-05-08 文章 like
最初因为使用了hive中自定义的UDF,所以我注册了hive的catalog,后面又要用到hive的内置函数,所以又用了HiveModule,代码如下:


val hive = new HiveCatalog("hive", "default", "/etc/hive_config", "1.2.1")
tableEnv.registerCatalog("hive", hive)
tableEnv.useCatalog("hive")
tableEnv.useDatabase("default")
tableEnv.loadModule("myhive", new HiveModule("1.2.1"))




在2020年5月8日 16:30,Jingsong Li 写道:
Hi,

你是怎么用HiveModule的?还保留了CoreModule吗?

Best,
Jingsong Lee

On Fri, May 8, 2020 at 4:14 PM like  wrote:

各位大佬好,
目前我在使用HiveModule的过程中碰到了一些问题,在未使用HiveModule的时候用了REGEXP_REPLACE
函数,这是可以正常使用的,在使用HiveModule之后却出现了问题,当我把字符替换为空的时候就会报错
REGEXP_REPLACE('abcd', 'a', ''),大家有碰到这个问题吗?或者我能选择使用flink或者hive的内置函数吗?




异常堆栈信息如下:


org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: SQL validation failed.
java.lang.reflect.InvocationTargetException
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$38/1264413185.call(Unknown
Source)
at
org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/1243806178.run(Unknown
Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. java.lang.reflect.InvocationTargetException
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
at
com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine$$$Lambda$765/1756039478.apply(Unknown
Source)
at scala.collection.immutable.List.foreach(List.scala:388)
at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 13 more
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction$$Lambda$780/1747631271.inferReturnType(Unknown
Source)
at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at
org.apache.calcite.sql.type.InferTypes$$Lambda$169/1269773610.inferOperandTy

关于flink1.10.0使用HiveModule的问题

2020-05-08 文章 like
各位大佬好,
  目前我在使用HiveModule的过程中碰到了一些问题,在未使用HiveModule的时候用了REGEXP_REPLACE
函数,这是可以正常使用的,在使用HiveModule之后却出现了问题,当我把字符替换为空的时候就会报错
REGEXP_REPLACE('abcd', 'a', ''),大家有碰到这个问题吗?或者我能选择使用flink或者hive的内置函数吗?




异常堆栈信息如下:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL validation failed. 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.client.cli.CliFrontend$$Lambda$38/1264413185.call(Unknown 
Source)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/1243806178.run(Unknown
 Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
at 
com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine$$$Lambda$765/1756039478.apply(Unknown Source)
at scala.collection.immutable.List.foreach(List.scala:388)
at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 13 more
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction$$Lambda$780/1747631271.inferReturnType(Unknown
 Source)
at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at 
org.apache.calcite.sql.type.InferTypes$$Lambda$169/1269773610.inferOperandTypes(Unknown
 Source)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1873)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4040)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3181)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.scala:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(Sql

使用Flink1.10.0 history无法访问的问题

2020-04-06 文章 like
大家好:
  我使用的是Flink1.10.0,部署了history服务,刚开始服务正常,运行了几天之后,history页面无法打开,发现 /config 
接口报 404,history的服务进程没有挂掉,log日志中无任何异常信息,请问一下大家有没有碰到这样的异常?

回复: 使用Flink1.10.0读取hive时source并行度问题

2020-03-01 文章 like
非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。




在2020年3月2日 15:18,JingsongLee 写道:
Hi,

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

使用Flink1.10.0读取hive时source并行度问题

2020-03-01 文章 like
hi,大家好

  我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

回复: 关于1.9使用hive中的udf

2019-09-26 文章 like
非常感谢,但是我试过的 hive.xx_db.xx_udf 这种方式是找不到这个udf的,必须使用 tableEnv.useCatalog("hive") 
、tableEnv.useDatabase("default") 




在2019年9月26日 16:43,Terry Wang 写道:

问题1:
default关键词报错是否试过   hive.`default`.xx_udf 方式, 这样转义应该能解决关键词报错的问题。

问题2:
flink 1.10 中会支持modular plugin的方式,使用起来会更方便


Best,
Terry Wang



在 2019年9月25日,下午7:21,like  写道:

各位大佬好:
目前我在使用1.9版本中hive的udf碰到如下问题:
1、hive的udf都是注册在default库中,sql里面带有default关键词,flink程序就会报错
我通过 tableEnv.useCatalog("hive") 
、tableEnv.useDatabase("default")这种方式解决了default关键词的问题
同时发现如果不使用tableEnv.useDatabase("xx_db"),直接使用  xx_db.fun是找不到函数的


2、使用上面的方式能使用hive中指定某个库的udf,但是需要使用flink中注册的表会很麻烦
sql里需要这么写(default_catalog.default_database.xx_table)


请问大家有没有好的使用方式和建议?感谢 !  


关于1.9使用hive中的udf

2019-09-25 文章 like
各位大佬好:
目前我在使用1.9版本中hive的udf碰到如下问题:
1、hive的udf都是注册在default库中,sql里面带有default关键词,flink程序就会报错
我通过 tableEnv.useCatalog("hive") 
、tableEnv.useDatabase("default")这种方式解决了default关键词的问题
同时发现如果不使用tableEnv.useDatabase("xx_db"),直接使用  xx_db.fun是找不到函数的


2、使用上面的方式能使用hive中指定某个库的udf,但是需要使用flink中注册的表会很麻烦
sql里需要这么写(default_catalog.default_database.xx_table)


 请问大家有没有好的使用方式和建议?感谢 !  

回复: kafka流与hive表join问题

2019-08-27 文章 like
Hi Jark


非常感谢你提供的方案,我不了解udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?这应该会带来很大的开销。还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗?


在2019年8月28日 10:10,Jark Wu 写道:
Hi,

看了你的问题,主要有两个问题。
1. join hive 维表,没加载完就有 join 输出了。
2. hive 加载完后,就不再做 checkpoint 了。

第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数据到内存,udtf 的 eval 
方法在加载完 hive 数据之前不返回,这样可以避免没有加载完就有输出的问题。
第二个问题,目前 streaming job 中如果存在 finish vertex,是无法做 checkpoint 的。


Best,
Jark



在 2019年8月27日,17:41,like  写道:

我通过hive union 
kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?


在2019年8月27日 17:33,苏 欣 写道:
我之前试过两种方式,但都有各自的问题:
1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。


sean...@live.com

发件人: like<mailto:likeg...@163.com>
发送时间: 2019-08-27 17:15
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: kafka流与hive表join问题
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?






maven配置错误百出

2019-08-27 文章 like
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/



回复: kafka流与hive表join问题

2019-08-27 文章 like
我通过hive union 
kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?


在2019年8月27日 17:33,苏 欣 写道:
我之前试过两种方式,但都有各自的问题:
1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。


sean...@live.com

发件人: like<mailto:likeg...@163.com>
发送时间: 2019-08-27 17:15
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: kafka流与hive表join问题
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





回复: kafka流与hive表join问题

2019-08-27 文章 like
我通过HCatInputFormat读取了hive的数据注册了一张表,然后读取kafka的数据也注册了一张表,join就是通过sql写的,没有什么代码逻辑呢。


| |
like
|
|
likeg...@163.com
|
签名由网易邮箱大师定制
在2019年8月27日 17:17,Jeff Zhang 写道:
你是怎么join hive表的,能share你的代码吗?

like  于2019年8月27日周二 下午5:15写道:

请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
SYSTEM_TIME AS OF
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





--
Best Regards

Jeff Zhang


kafka流与hive表join问题

2019-08-27 文章 like
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





kafka流与hive表join问题

2019-08-27 文章 like
请问一下,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?


| |
like
|
|
likeg...@163.com
|
签名由网易邮箱大师定制