回复:flink yarn-perjob提交任务无法启动

2021-03-15 文章 lian
两种情况:
情况1:jar打包不完整,重新打包试一下
情况2:缺少依赖


在2021年03月15日 21:59,刘朋强 写道:
问题:
通过如下命令提交任务到yarn-cluster,
flink run -m yarn-cluster -yjm 1024m -ytm 2048m -c 
org.apache.flink.streaming.examples.wordcount.WordCount 
/home/lpq/flink-examples-streaming_2.11.jar
在flink ui界面taskmanager总是0,任务无法启动,没有报错信息,不知道如何排查
yarn UI
flink ui


yarn container log
down cluster because application is in SUCCEEDED, diagnostics null.
2021-03-15 21:55:47,330 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Unregister application from the YARN Resource Manager with 
final status SUCCEEDED.
2021-03-15 21:55:47,344 INFO  
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl[] - Waiting for 
application to be successfully unregistered.
2021-03-15 21:55:48,035 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2021-03-15 21:55:48,036 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Stopping JobDispatcherLeaderProcess.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopping 
dispatcher akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator
 [] - Shutting down back pressure request coordinator.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopped 
dispatcher akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,040 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy [] - 
Opening proxy : bdp:33576
2021-03-15 21:55:48,041 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
Interrupted while waiting for queue
java.lang.InterruptedException: null
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 ~[?:1.8.0_121]
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 ~[?:1.8.0_121]
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
~[?:1.8.0_121]
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 [hadoop-yarn-client-2.7.7.jar:?]
2021-03-15 21:55:48,066 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Closing the SlotManager.
2021-03-15 21:55:48,066 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Suspending the SlotManager.
2021-03-15 21:55:48,069 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:34775
2021-03-15 21:55:48,069 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopping Akka RPC service.
2021-03-15 21:55:48,076 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopping Akka RPC service.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remote daemon 
shut down; proceeding with flushing remote transports.
2021-03-15 21:55:48,095 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remote daemon 
shut down; proceeding with flushing remote transports.
2021-03-15 21:55:48,111 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2021-03-15 21:55:48,119 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2021-03-15 21:55:48,129 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopped Akka RPC service.
2021-03-15 21:55:48,130 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopped Akka RPC service.
2021-03-15 21:55:48,132 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Terminating 
cluster entrypoint process YarnJobClusterEntrypoint with exit code 0.













Re: pyflink使用的一些疑问

2021-03-15 文章 xiaoyue
您好,
目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~ 
还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~




xiao...@ysstech.com
 
发件人: qian he
发送时间: 2021-03-14 18:59
收件人: user-zh-flink
主题: pyflink使用的一些疑问
你好,
 
最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对应pandas项目改造成flink,有什么好的建议么?
6. datastream api为什么没有实现Windows方法?后面版本会支持吗?
 
非常感谢,十分看好flink,希望社区越做越大,辛苦了!


Connection reset by peer

2021-03-15 文章 yidan zhao
任务异常自动重启,日志如下,伙伴们帮忙分析下问题。
2021-03-16 00:00:06
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.35.100.171/10.35.100.171:2016')
at org.apache.flink.runtime.io.network.netty.
CreditBasedPartitionRequestClientHandler.exceptionCaught(
CreditBasedPartitionRequestClientHandler.java:173)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:281)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireExceptionCaught(
AbstractChannelHandlerContext.java:273)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline
.java:1377)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:281)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(
AbstractEpollStreamChannel.java:728)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
AbstractEpollStreamChannel.java:818)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(
AbstractEpollChannel.java:442)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.processReady(EpollEventLoop.java:482)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.run(EpollEventLoop.java:378)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.
Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Re: flink1.12版本,使用yarn-application模式提交任务失败

2021-03-15 文章 todd
我从flink yaml文件设置了如下配置项:
HADOOP_CONF_DIR:
execution.target: yarn-application
yarn.provided.lib.dirs:hdfs://...
pipeline.jars: hdfs://...

所以我不确定你们使用yarn-application如何进行的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink dataset没有支持相关map reduce函数

2021-03-15 文章 Dian Fu
Hi, 

有几个疑问:
1)你说的map reduce函数具体指的什么?可以举一个例子吗?
2)DataSet API指的是Java的DataSet API吗?另外,Java的DataSet API会逐步废弃,统一到DataStream 
API上来,所以PyFlink里不会支持DataSet API,只支持Python Table API和Python DataStream API


> 2021年3月13日 上午10:54,nova.he  写道:
> 
> 你好,
>
> 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,没有相关map
>  reduce函数,所以有以下疑问:
> 1.Python flink的SDK还没支持dataset吗?
> 2.是不是有其他替代方法?
> 3.如果还没支持,有计划支持的时间吗?
> 4.flink table为啥不支持map reduce操作?
> 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map 
> reduce操作,对应pandas项目改造成flink,有什么好的建议么?
> 
> 
> 非常感谢,十分看好flink,希望社区越做越大,辛苦了!
> 
> 
> nova.he
> nova...@qq.com



请问有flink + hudi或iceberg + aliyun oss的示例吗?

2021-03-15 文章 casel.chen
请问有flink + hudi或iceberg + aliyun oss的示例吗?谢谢!

flink yarn-perjob提交任务无法启动

2021-03-15 文章 刘朋强
问题:
通过如下命令提交任务到yarn-cluster,
flink run -m yarn-cluster -yjm 1024m -ytm 2048m -c 
org.apache.flink.streaming.examples.wordcount.WordCount 
/home/lpq/flink-examples-streaming_2.11.jar
在flink ui界面taskmanager总是0,任务无法启动,没有报错信息,不知道如何排查
yarn UI
flink ui


yarn container log
down cluster because application is in SUCCEEDED, diagnostics null.
2021-03-15 21:55:47,330 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Unregister application from the YARN Resource Manager with 
final status SUCCEEDED.
2021-03-15 21:55:47,344 INFO  
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl[] - Waiting for 
application to be successfully unregistered.
2021-03-15 21:55:48,035 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2021-03-15 21:55:48,036 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Stopping JobDispatcherLeaderProcess.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopping 
dispatcher akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator
 [] - Shutting down back pressure request coordinator.
2021-03-15 21:55:48,037 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopped 
dispatcher akka.tcp://flink@bdp:38344/user/rpc/dispatcher_1.
2021-03-15 21:55:48,040 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy [] - 
Opening proxy : bdp:33576
2021-03-15 21:55:48,041 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
Interrupted while waiting for queue
java.lang.InterruptedException: null
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 ~[?:1.8.0_121]
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 ~[?:1.8.0_121]
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
~[?:1.8.0_121]
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 [hadoop-yarn-client-2.7.7.jar:?]
2021-03-15 21:55:48,066 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Closing the SlotManager.
2021-03-15 21:55:48,066 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Suspending the SlotManager.
2021-03-15 21:55:48,069 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:34775
2021-03-15 21:55:48,069 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopping Akka RPC service.
2021-03-15 21:55:48,076 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopping Akka RPC service.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2021-03-15 21:55:48,093 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remote daemon 
shut down; proceeding with flushing remote transports.
2021-03-15 21:55:48,095 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remote daemon 
shut down; proceeding with flushing remote transports.
2021-03-15 21:55:48,111 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2021-03-15 21:55:48,119 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2021-03-15 21:55:48,129 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopped Akka RPC service.
2021-03-15 21:55:48,130 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Stopped Akka RPC service.
2021-03-15 21:55:48,132 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Terminating 
cluster entrypoint process YarnJobClusterEntrypoint with exit code 0.













Re: Flink shuffle vs rebalance

2021-03-15 文章 Kezhu Wang
ShufflePartitioner:

  public int selectChannel(SerializationDelegate>
record) {
  return random.nextInt(numberOfChannels);
  }

RebalancePartitioner
 public int selectChannel(SerializationDelegate>
record) {
  nextChannelToSendTo = (nextChannelToSendTo + 1) %
numberOfChannels;
  return nextChannelToSendTo;
 }


一个随机,一个严格 round-robin。


Best,
Kezhu Wang

On March 15, 2021 at 22:02:33, 赢峰 (si_ji_f...@163.com) wrote:



Flink 中 shuffle Partitioner 和 rebalance partitoner 有什么区别?


Flink shuffle vs rebalance

2021-03-15 文章 赢峰


Flink 中 shuffle Partitioner 和 rebalance partitoner 有什么区别?







Re: flink1.12版本,使用yarn-application模式提交任务失败

2021-03-15 文章 Congxian Qiu
Hi
从你的日志看作业启动失败的原因是:
   Caused by: java.lang.IllegalArgumentException: Wrong FS:
   hdfs://xx/flink120/, expected: file:///
   看上去你设置的地址和 需要的 schema 不一样,你需要解决一下这个问题

Best,
Congxian


todd  于2021年3月15日周一 下午2:22写道:

> 通过脚本提交flink作业,提交命令:
> /bin/flink run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs://xx/flink120/" hdfs://xx/flink-example.jar
> --sqlFilePath   /xxx/kafka2print.sql
>
> flink使用的Lib及user jar已经上传到Hdfs路径,但是抛出以下错误:
> ---
>  The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
> at
>
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
> at
>
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
> Caused by: java.lang.IllegalArgumentException: Wrong FS:
> hdfs://xx/flink120/, expected: file:///
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:648)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> at
>
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.lambda$getAllFilesInProvidedLibDirs$2(YarnApplicationFileUploader.java:469)
> at
>
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> at java.util.ArrayList.forEach(ArrayList.java:1257)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:466)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.(YarnApplicationFileUploader.java:106)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:381)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:789)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:458)
> ... 9 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题:


创建 View promotionTable:

SELECT *, 'CN' as country, id as pid
FROM promotion_cn_rule_tab
UNION
SELECT *, 'JP' as country, id as pid
FROM promotion_jp_rule_tab


FLink SQL Query:

SELECT t1.country, t1.promotionId, t1.orderId,
CASE WHEN t2.pid IS NULL THEN 'Rebate'
ELSE 'Rebate'
END AS rebate
FROM eventTable AS t1
LEFT JOIN promotionTable
/*+ OPTIONS('streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '5 m') */
FOR SYSTEM_TIME AS OF t1.procTime AS t2
ON t1.promotionId = t2.pid
AND t1.country = t2.country


如果去掉 Hive 表的 union ,只保留一个国家的 Hive 表,可以run 成功,但是如果 Union 两张表的话,会得到错误:

Caused by: org.apache.flink.table.api.ValidationException: Currently the
join key in Temporal Table Join can not be empty


Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题:


创建 View promotionTable:

SELECT *, 'CN' as country, id as pid FROM promotion_cn_rule_tab UNION
SELECT *, 'JP' as country, id as pid FROM promotion_jp_rule_tab

FLink SQL Query:
SELECT t1.country, t1.promotionId, t1.orderId, CASE WHEN t2.pid IS NULL
THEN 'Rebate' ELSE 'Rebate' END AS rebate FROM eventTable AS t1 LEFT JOIN
promotionTable /*+ OPTIONS('streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5
m') */ FOR SYSTEM_TIME AS OF t1.procTime AS t2 ON t1.promotionId = t2.pid
AND t1.country = t2.country


如果去掉 Hive 表的 union ,只保留一个国家的 Hive 表,可以run 成功,但是如果 Union 两张表的话,会得到错误:

Caused by: org.apache.flink.table.api.ValidationException: Currently the
join key in Temporal Table Join can not be empty


Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 文章 Shengkai Fang
Hi.

对于table scan而言
- +I和+U都是被认为是insert消息, changelog normalize 则是会将消息处理为正确的类型;
- 我们在scan的时候看到 tombstone的消息的value部分是空,因此直接将类型设置为delete,在changelog
normalize的时候会补全value部分的值。
- -u消息是不会存入到upsert-kafka之中的

详细的可以参考下这里的ppt[1]

Best,
Shengkai

[1] https://flink-learning.org.cn/developers/flink-training-course3/


刘首维  于2021年3月15日周一 下午2:39写道:

> Hi Shengkai,
>
>
> 感谢回复
>
>
> 让我理解一下:
>
>在ChangelogNormalize中
>
>   1.  Rowkind是未生效的
>
>   2.  null表达墓碑
>
>   3.  保存全量数据的overhead
>
>
>  如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record?
>
> 我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value ==
> null的Record吗
>
>
>
>
> 
> 发件人: Shengkai Fang 
> 发送时间: 2021年3月15日 14:21:31
> 收件人: user-zh@flink.apache.org
> 主题: Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的
>
> Hi.
>
> 当初的设计是基于kafka的compacted topic设计的,而compacted
> topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
> message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。
>
> 这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。
>
> Best,
> Shengkai
>
> 刘首维  于2021年3月15日周一 上午11:48写道:
>
> > Hi all,
> >
> >
> >
> > 最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> > 必须是insert-only的,请问这是什么原因呢。
> >
> > 如果不是的话,请直接指正我,谢谢。
> >
> >
> >
> >
> >
> > Flink version 1.12.1
> >
>


Re: flink 1.12 使用流式写入hive报错,Failed to create Hive RecordWriter

2021-03-15 文章 yinghua...@163.com
Caused by: java.lang.OutOfMemoryError: Java heap space



yinghua...@163.com
 
发件人: william
发送时间: 2021-03-15 16:32
收件人: user-zh
主题: flink 1.12 使用流式写入hive报错,Failed to create Hive RecordWriter
flink 1.12
hadoop 2.7.5
hive 2.3.6
 
报错内容:
2021-03-15 16:29:43
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:165)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:48)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:263)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:235)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$36.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:77)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:157)
... 34 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:67)
... 35 more
Caused by: java.lang.OutOfMemoryError: Java heap space
 
 
 
--
Sent 

flink 1.12 使用流式写入hive报错,Failed to create Hive RecordWriter

2021-03-15 文章 william
flink 1.12
hadoop 2.7.5
hive 2.3.6

报错内容:
2021-03-15 16:29:43
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:165)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:48)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:263)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:235)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$36.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:77)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:157)
... 34 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at

Re: 关于pyflink LATERAL TABLE 问题请教

2021-03-15 文章 陈康
简单提供了下 可复现的例子,请帮忙看看~谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于statement输出结果疑问

2021-03-15 文章 Dian Fu
奥,那你理解错了。这里面其实细分成2种情况:
- sink1和sink2,通过operator 
chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据
- sink1 
和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。

> 2021年3月12日 下午10:52,刘杰鸿  写道:
> 
> 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
> 所以输出应该是1,2,1,2
> 
> -- 原始邮件 --
> 发件人: "Dian Fu" ;
> 发送时间: 2021年3月12日(星期五) 晚上10:24
> 收件人: "user-zh";"刘杰鸿";
> 主题: Re: 关于statement输出结果疑问
> 
> 可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
> 
> 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
> 
> On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿  > wrote:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> 
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> 
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> table_env.create_temporary_view("simple_source", table)
> table_env.execute_sql("""
> CREATE TABLE first_sink_table (
> id BIGINT, 
> data VARCHAR 
> ) WITH (
> 'connector' = 'print'
> )
> """)
> table_env.execute_sql("""
> CREATE TABLE second_sink_table (
> id BIGINT, 
> data VARCHAR
> ) WITH (
> 'connector' = 'print'
> )
> """)
> # 创建一个statement对象
> statement_set = table_env.create_statement_set()
> # 使用TABLE API 将table表插进first_sink_table表里面
> statement_set.add_insert("first_sink_table", table)
> # 使用SQL将table表插进second_sink_table表里面
> statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM 
> simple_source")
> # 执行查询
> statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4
>  +I(1,Hi)
> 4 +I(1,Hi)
> 4 +I(2,Hello)
> 4 +I(2,Hello)



答复: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 文章 刘首维
Hi Shengkai,


感谢回复


让我理解一下:

   在ChangelogNormalize中

  1.  Rowkind是未生效的

  2.  null表达墓碑

  3.  保存全量数据的overhead


 如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record?

我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value == null的Record吗





发件人: Shengkai Fang 
发送时间: 2021年3月15日 14:21:31
收件人: user-zh@flink.apache.org
主题: Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

Hi.

当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。

这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。

Best,
Shengkai

刘首维  于2021年3月15日周一 上午11:48写道:

> Hi all,
>
>
>
> 最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> 必须是insert-only的,请问这是什么原因呢。
>
> 如果不是的话,请直接指正我,谢谢。
>
>
>
>
>
> Flink version 1.12.1
>


flink1.12版本,使用yarn-application模式提交任务失败

2021-03-15 文章 todd
通过脚本提交flink作业,提交命令:
/bin/flink run-application -t yarn-application 
-Dyarn.provided.lib.dirs="hdfs://xx/flink120/" hdfs://xx/flink-example.jar
--sqlFilePath   /xxx/kafka2print.sql

flink使用的Lib及user jar已经上传到Hdfs路径,但是抛出以下错误:
---
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: java.lang.IllegalArgumentException: Wrong FS:
hdfs://xx/flink120/, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:648)
at
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425)
at
org.apache.flink.yarn.YarnApplicationFileUploader.lambda$getAllFilesInProvidedLibDirs$2(YarnApplicationFileUploader.java:469)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at
org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:466)
at
org.apache.flink.yarn.YarnApplicationFileUploader.(YarnApplicationFileUploader.java:106)
at
org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:381)
at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:789)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:458)
... 9 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 文章 Shengkai Fang
Hi.

当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。

这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。

Best,
Shengkai

刘首维  于2021年3月15日周一 上午11:48写道:

> Hi all,
>
>
>
> 最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> 必须是insert-only的,请问这是什么原因呢。
>
> 如果不是的话,请直接指正我,谢谢。
>
>
>
>
>
> Flink version 1.12.1
>