Re: flinksql如何控制结果输出的频率
hi: 你可以自定义一个trigger [1] 第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道: > 我有两个需求 > 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? > 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?
Re: [udf questions]
ERROR log: . Job has been submitted with JobID 91ac323d4d5338418883240680192f34 Traceback (most recent call last): File "", line 1, in File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py", line 907, in execute File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, backoffTimeMS=6) at
flinksql如何控制结果输出的频率
我有两个需求 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?
Re: [udf questions]
感谢大佬回复。 根据邮件里面的提示下我尝试了如下操作: @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) def str_add(str_name): return '1' table_env.register_function("str_add", str_add) table_env.sql_update("insert into flink_sinktable_ad_test_1 \ select \ str_add(topicid) AS topicid \ from \ flink_sourcetable_ad_test \ ") 目的:我的目的是想通过最简单的方式看看udf是否有生效。 结果:结果依赖没有数据流入近来。 其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。 所以能在分析下么?或者我应该如何深入的跟踪下? all code below: from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json from pyflink.common import RestartStrategies from pyflink.table.udf import udf import json env = StreamExecutionEnvironment.get_execution_environment() ##contain设置 env.set_parallelism(12) env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) ##使用blink api environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings) table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \ host STRING, \ type STRING, \ topicid STRING, \ message STRING, \ proctime as PROCTIME() \ ) WITH ( \ 'connector.type' = 'kafka',\ 'connector.version' = 'universal', \ 'connector.topic' = 'advertise_module', \ 'connector.properties.zookeeper.connect' = 'localhost:2181', \ 'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \ 'connector.properties.group.id' = 'flink_1.10_test_source', \ 'connector.startup-mode' = 'latest-offset', \ 'format.type' = 'json' \ )") table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \ topicid STRING \ ) WITH ( \ 'connector.type' = 'kafka',\ 'connector.version' = 'universal', \ 'connector.topic' = 'recommend_user_concern_test', \ 'connector.properties.zookeeper.connect' = 'localhost:2181', \ 'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \ 'connector.properties.group.id' = 'flink_1.10_test_sink', \ 'connector.startup-mode' = 'latest-offset', \ 'connector.properties.retries' = '3', \ 'format.type' = 'json', \ 'connector.properties.update_mode' = 'append' \ )") @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) def str_add(str_name): return '1' table_env.register_function("str_add", str_add) #table_env.register_function("str_add", udf(lambda i: i + '1', DataTypes.STRING(), DataTypes.STRING())) table_env.sql_update("insert into flink_sinktable_ad_test_1 \ select \ str_add(topicid) AS topicid \ from \ flink_sourcetable_ad_test \ ") table_env.execute('flink_1.10_test’) -- > 在 2020年3月26日,下午5:55,jincheng sun 写道: > > 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。 > > Best, > Jincheng > > > WuPangang mailto:wpangang1...@icloud.com>> > 于2020年3月26日周四 下午5:24写道: > Data as below: > > {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 > > 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com > >
回复:(无主题)
并发是200和400两种,集群有270多个节点,不过可用的vcores是6600多,内存是17T左右,看了执行图q43这个存在数据倾斜的的问题,失败的节点存在数据量偏大的情况 ---原始邮件--- 发件人: "Jingsong Li"
Re: (无主题)
Hi, - 是否是计算规模的问题? 集群大小合适吗?并发合适吗? - 是否是Plan不优的问题? Hive的表有做Analysis吗? CC: user Best, Jingsong Lee On Thu, Mar 26, 2020 at 8:27 PM 被惊艳的时光 <2521929...@qq.com> wrote: > > hello,你好,有个关于flink-sql-benchmark工具的问题需要请教下,在做tpc-ds测试时,当数据量达到4T时(flink版本1.10),q43,q67,q70这三条sql执行出错了,都是在hashjoin的时候失败啦,报错信息是hashjoin迭代的次数过多,不知道之前你们在测试时有没有出现这种情况 > -- Best, Jingsong Lee
Re: Re: flink 安装包的几个 jar 是怎么 build 出来的
统一对 Flink 项目源码进行编译打包,你会在 flink-dist 这个模块下面的 target 目录下面看到相关 Flink 命令行的一些东西,同时在lib 包下面, 会有一些 Flink Jar 包 Best wishes, 沈磊 godfrey he 于2020年3月26日周四 下午8:51写道: > 目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala > 2.11 的包,例如 flink-table-blink_*2.11*-1.10.0.jar。 > 可以通过 -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12* > -1.10.0.jar 这样的。 > > Best, > Godfrey > > wangl...@geekplus.com.cn 于2020年3月26日周四 > 下午6:34写道: > > > > > flink-table-uber-blink 下 > > mvn clean install -DskipTests -Dscala-2.12 -DskipTests > > > > 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的 > > > > 谢谢, > > 王磊 > > > > > > wangl...@geekplus.com.cn > > > > Sender: Kurt Young > > Send Time: 2020-03-26 18:15 > > Receiver: user-zh > > cc: jihongchao > > Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的 > > flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) > > > > Best, > > Kurt > > > > > > On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < > > wangl...@geekplus.com.cn> wrote: > > > > > > > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > > > 这个 jar 是从哪里 build 出来的呢? > > > > > > 我 clone github 上的源代码,mvn clean package > > > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > > > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > > > flink-table-blink_2.12-1.10.0.jar 是对应的 > > > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > > > > > 谢谢, > > > 王磊 > > > > > > > > > > > > wangl...@geekplus.com.cn > > > > > > > > >
Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题
Hi 社区的小伙伴, 我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说: > RocksDB compaction filter will query current timestamp, used to check > expiration, from Flink every time after processing certain number of state > entries. 现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个 compaction filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗? 还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key 就过滤删除掉。 这个地方我没有弄明白,非常期待你的回复。 Best wishes, 沈磊
Re: Re: flink 安装包的几个 jar 是怎么 build 出来的
目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala 2.11 的包,例如 flink-table-blink_*2.11*-1.10.0.jar。 可以通过 -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12* -1.10.0.jar 这样的。 Best, Godfrey wangl...@geekplus.com.cn 于2020年3月26日周四 下午6:34写道: > > flink-table-uber-blink 下 > mvn clean install -DskipTests -Dscala-2.12 -DskipTests > > 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的 > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > Sender: Kurt Young > Send Time: 2020-03-26 18:15 > Receiver: user-zh > cc: jihongchao > Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的 > flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) > > Best, > Kurt > > > On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > > > > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > > 这个 jar 是从哪里 build 出来的呢? > > > > 我 clone github 上的源代码,mvn clean package > > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > > flink-table-blink_2.12-1.10.0.jar 是对应的 > > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > > > 谢谢, > > 王磊 > > > > > > > > wangl...@geekplus.com.cn > > > > >
Flink Weekly | 每周社区动态更新 - 2020/03/26
大家好,本文为 Flink Weekly 的第十期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。 社区开发进展 [release] 关于发布 Flink 1.10.1 的讨论正在火热进行,最新消息请参考 Yu Li 发起的讨论。 [1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html [Checkpoint] Arvid Heise 发起 FLIP-76 的投票已经通过。FLIP-76 提出了一种基于检查点屏障的非阻塞对齐执行检查点的方法。 相关好处有: 即使某些 Operator 仍在等待正在输入通道上的检查点屏障,上游仍可以继续产生数据。 即使对于具有单个输入通道的 Operator,在整个执行图中的检查点次数也大大减少。 即使在不稳定的环境中,最终用户也将看到更多的进展,因为更及时的检查点将避免过多的重复计算。 促进更快地 rescaling。 更多信息参考: [2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints [3]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-76-Unaligned-checkpoints-td33651.html [Connectors/Filesystem] 删除 BucketingSink。BucketingSink 已经在 Flink 1.9 版本标记为过期。Flink 有一个新的 StreamingFileSink 替代 BucketingSink。目前 StreamingFileSink 的 scala 版本存在 bug。 [4]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16616-Drop-BucketingSink-td38950.html [5]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Bucketing-Sink-td38830.html#a38831 [6]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16684-StreamingFileSink-builder-does-not-work-with-Scala-td39109.html [Table API & SQL] Jingsong Li 发起了引入 StatefulSequenceSource 的讨论。这个能够方便用户更好的进行测试 SQL。最终讨论决定在 Table 支持 DataGenerator 的 source、Print 的 sink 和blackhole 的 sink。 [7]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-TableFactory-for-StatefulSequenceSource-td39116.html [sql] Timo 分享了一个关于新的 TableSource 和 TableSink 接口的提案(FLIP-95)。Jark、Dawid、Aljoscha、Kurt、Jingsong 等参考了讨论。其目标是简化当前的接口架构,以支持变更日志源(FLIP-105)和删除对 DataStream API 和 planner 的依赖。 [8]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces [hadoop]跟进 Stephan 和 Till 的讨论。Sivaprasanna 分享了 Hadoop 相关实用程序组件的概述,以开始讨论将其移动到单独的模块中 “flink-hadoop-utils”。 [9]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SerializableHadoopConfiguration-td38371.html [10]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-a-new-module-flink-hadoop-utils-td39107.html 用户问题 叶贤勋在使用 Hive Source 的时候遇到了 Kerberos 认证的问题,社区同学进行了相关的讨论和建议,感兴趣的同学可以参考如下链接: [11]http://apache-flink.147419.n8.nabble.com/Hive-Source-With-Kerberos-td1688.html hiliuxg 在社区提问 Flink SQL 如何支持每隔 5 分钟触发当日零点到当前 5 分钟的聚合计算。Jark Wu 和 Tianwang Li 进行了相关解答。 [12]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html hiliuxg 在社区提问 Flink SQL COUNT DISTINCT 性能优化。Benchao Li、田志声、Lucas Wu、Lake Shen 展开了一些讨论,有兴趣的同学可以参考如下链接: [13]http://apache-flink.147419.n8.nabble.com/flink-sql-td2012.html 王志华 在社区提问 Flink DDL 如何支持自定义 Source/Sink 表。社区同学在邮件中进行了详细的回答。 [14]http://apache-flink.147419.n8.nabble.com/ddl-td1959.html 111 在社区提问 Flink SQL1.10 大表 join 如何优化?Jark Wu、Kurt Young 和 Jingsong Lee 进行了详细的解答。目前 Flink SQL 的并行度(非 Source )并不是自动推断出来的,需要通过设置table.exec.resource.default-parallelism,详细的内容参考: [15]http://apache-flink.147419.n8.nabble.com/Flink-SQL1-10-join-td2044.html [16]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-How-can-i-set-parallelism-in-clause-of-group-by-td33736.html Aaron Levin 在社区提问 如何能够做到修改任务的并发,然后从 checkpoint 启动任务。Piotr Nowojski、Till Rohrmann 参与了相关讨论。内容涉及到 unaligned checkpoints (FLIP-76) 对savepoint 和 checkpoint 的影响。同时 Lake Shen 也提出了类似的问题。有兴趣的同学可以参考 [17]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-when-changing-operator-parallelism-but-starting-from-an-incremental-checkpoint-td33608.html [18]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cancel-the-flink-task-and-restore-from-checkpoint-can-I-change-the-flink-operator-s-parallelism-td33613.html Jiawei Wu 在社区提问“如何使用 Flink SQL 计算 按照供应商分组同时入库时间大于 15 天的库存数据?”,有兴趣的同学可以参考: [19]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-flink-to-calculate-sum-of-the-inventory-under-certain-conditions-td33323.html Vinod Mehra 在社区提出了一个关于 Join 相关的问题。这个问题比较复杂,Timo Walther 进行了相关解答。里面涉及到了一些如何进行 Flink SQL 问题的排查。有兴趣的同学可以参考: [20]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/time-windowed-joins-and-tumbling-windows-td33551.html 活动博客文章及其他 SQL 开发任务超 50% !滴滴实时计算的演进与优化 [21]https://ververica.cn/corporate_practice/evolution-and-optimization-of-didi-real-time-computing/ Flink 生态:一个案例快速上手 PyFlink [22]https://ververica.cn/developers/pyflink-a-case-in-hand/ 一套 SQL 搞定数据仓库?Flink有了新尝试[23]https://ververica.cn/developers/a-set-of-sql-to-handle-data-warehouse/ 如何在 Flink 中规划 RocksDB 内存容量? [24]https://ververica.cn/developers/how-to-plan-the-memory-capacity-of-rocksdb-in-flink/
Re: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 下 mvn clean install -DskipTests -Dscala-2.12 -DskipTests 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-26 18:15 Receiver: user-zh cc: jihongchao Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的 flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) Best, Kurt On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > 这个 jar 是从哪里 build 出来的呢? > > 我 clone github 上的源代码,mvn clean package > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > flink-table-blink_2.12-1.10.0.jar 是对应的 > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > >
Re: 回复: Flink JDBC Driver是否支持创建流数据表
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下 godfrey he 于2020年3月25日周三 下午6:24写道: > hi 赵峰, > > 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink > JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。 > > Best, > Godfrey > > Zhenghua Gao 于2020年3月25日周三 下午4:26写道: > >> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。 >> 目前的报错看起来是找不到kafka connector的jar包。 >> >> *Best Regards,* >> *Zhenghua Gao* >> >> >> On Wed, Mar 25, 2020 at 4:18 PM 赵峰 wrote: >> >> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 >> > >> > >> > >> > >> > 参考下这个文档: >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >> > 下面的语法应该是不支持的: >> > 'format.type' = 'csv',\n" + >> > "'format.field-delimiter' = '|'\n" >> > >> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} >> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" >> > + "order_no VARCHAR,\n" >> > + "status INT\n" >> > + ") WITH (\n" >> > + "'connector.type' = 'kafka',\n" >> > + "'connector.version' = 'universal',\n" >> > + "'connector.topic' = 'wanglei_test',\n" >> > + "'connector.startup-mode' = 'latest-offset',\n" >> > + "'connector.properties.0.key' = 'zookeeper.connect',\n" >> > + "'connector.properties.0.value' = 'xxx:2181',\n" >> > + "'connector.properties.1.key' = 'bootstrap.servers',\n" >> > + "'connector.properties.1.value' = 'xxx:9092',\n" >> > + "'update-mode' = 'append',\n" >> > + "'format.type' = 'json',\n" >> > + "'format.derive-schema' = 'true'\n" >> > + ")"); >> > >> > 王磊 >> > >> > >> > wangl...@geekplus.com.cn >> > 发件人: 赵峰 >> > 发送时间: 2020-03-24 21:28 >> > 收件人: user-zh >> > 主题: Flink JDBC Driver是否支持创建流数据表 >> > hi >> > >> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: >> > Connection connection = >> > >> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); >> > Statement statement = connection.createStatement(); >> > statement.executeUpdate( >> > "CREATE TABLE table_kafka (\n" + >> > "user_id BIGINT,\n" + >> > "item_id BIGINT,\n" + >> > "category_id BIGINT,\n" + >> > "behavior STRING,\n" + >> > "ts TIMESTAMP(3),\n" + >> > "proctime as PROCTIME(),\n" + >> > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + >> > ") WITH (\n" + >> > "'connector.type' = 'kafka', \n" + >> > "'connector.version' = 'universal', \n" + >> > "'connector.topic' = 'flink_im02', \n" + >> > "'connector.properties.group.id' = 'flink_im02_new',\n" + >> > "'connector.startup-mode' = 'earliest-offset', \n" + >> > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + >> > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + >> > "'format.type' = 'csv',\n" + >> > "'format.field-delimiter' = '|'\n" + >> > ")"); >> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); >> > while (rs1.next()) { >> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); >> > } >> > statement.close(); >> > connection.close(); >> > 报错: >> > Reason: Required context properties mismatch. >> > The matching candidates: >> > org.apache.flink.table.sources.CsvBatchTableSourceFactory >> > Mismatched properties: >> > 'connector.type' expects 'filesystem', but is 'kafka' >> > 赵峰 >> > >> > >> > Quoted from: >> > >> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html >> > >> > >> > >> > >> > 赵峰 >> >
Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) Best, Kurt On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > 这个 jar 是从哪里 build 出来的呢? > > 我 clone github 上的源代码,mvn clean package > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > flink-table-blink_2.12-1.10.0.jar 是对应的 > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > >
Re: [udf questions]
比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。 Best, Jincheng WuPangang 于2020年3月26日周四 下午5:24写道: > Data as below: > > {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 > 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/ > down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ > (PRA-AL00X; Android; Android OS ; 8.0.0; zh) > ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/ > down-ddz.734399.com > \\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/ > p12.jmstatic.com > \\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”} > Problem: > 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。 > 自己思考的解决思路:通过udf, 使用json.loads来处理。 > 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied > 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。 > > Code as below: > from pyflink.datastream import > StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic > from pyflink.table import StreamTableEnvironment, > EnvironmentSettings,TableSink,TableConfig,DataTypes > from pyflink.table.descriptors import > Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json > from pyflink.common import RestartStrategies > from pyflink.table.udf import udf > import json > > env = StreamExecutionEnvironment.get_execution_environment() > #env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > ##checkpoint设置 > #env.enable_checkpointing(30) > > #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE) > #env.get_checkpoint_config().set_min_pause_between_checkpoints(3) > #env.get_checkpoint_config().set_checkpoint_timeout(6) > #env.get_checkpoint_config().set_max_concurrent_checkpoints(1) > #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False) > ##contain设置 > env.set_parallelism(12) > env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) > ##使用blink api > environment_settings = > EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > table_env = >
flink 安装包的几个 jar 是怎么 build 出来的
单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 这个 jar 是从哪里 build 出来的呢? 我 clone github 上的源代码,mvn clean package 我以为 flink-table/flink-table-planner-blink 目录下build 出的 flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 flink-table-blink_2.12-1.10.0.jar 是对应的 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: flink1.10 & pyflink相关问题咨询
看你错误日志运行的示例使用了PyUDFDemoConnector,也就是参考的博客[1], 在写这个博客时候1.10还没有发布,在发布之后接口有变化,所以PyUDFDemoConnector有个问题,我前两天进行了更新。你可以更新一下JAR。 另外你发的问题很久之前,发布1.10之前已经fix了[2],所以你更新一下connector在测试一下看看。有问题继续沟通。 Best, Jincheng [1] https://enjoyment.cool/2019/12/05/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%E5%A6%82%E4%BD%95%E5%9C%A8PyFlink-1-10%E4%B8%AD%E8%87%AA%E5%AE%9A%E4%B9%89Python-UDF/ [2] https://issues.apache.org/jira/browse/FLINK-14581 zilong xiao 于2020年3月25日周三 下午12:19写道: > 是的,有一个关键步骤:`source > py36/bin/activate`是在文档中未体现的,执行该步骤后提交到yarn集群可以正常工作,然后最近在进一步研究1.10对于udf的支持,在尝试提交udf作业时,会出现如下异常: > > Caused by: java.io.IOException: Cannot run program > "xxx/pyflink-udf-runner.sh": error=2, No such file or directory > > 提交作业前的操作如下: > 1.pip install virtualenv > 2.virtualenv --always-copy venv > 3.venv/bin/pip install apache-beam==2.15.0 > 4.venv/bin/pip install apache-flink > 5.venv/bin/pip install pydemo.tar.gz > 6.zip -r venv.zip venv > 7.bin/flink run -pyarch venv.zip -pyexec venv.zip/venv/bin/python -py > ./word_count_socket.py -j pydemo.jar > > 不知道前辈是否有遇到过类似情况呢? > > 完整异常栈信息 & 作业见附件 > > jincheng sun 于2020年3月19日周四 下午12:08写道: > >> 开心看到你在使用PyFlink 1.10,您遇到的问题,核心问题和将解决方式如下: >> >> 1. >> 利用shell的alias功能更改python命令指向是无效的,因为flink不通过shell启动Python进程。所以对flink来说本地python环境依然是python2. >> 2. 可以通过virtualenv, conda等工具创建python3.5+的环境,并激活,在激活了的环境下提交python job。 比如: >> pip install virtualenv >> virtualenv --python /usr/local/bin/python3 py36 >> source py36/bin/activate >> flink run -py pyflink.py >> 3. 另外也可以修改python命令的软链接,令其指向python3.5+。 >> >> 你可以尝试一下,有问题随时邮件交流! >> >> Best, >> 孙金城(金竹) >> >> >> >> zilong xiao 于2020年3月18日周三 下午12:14写道: >> >>> hi,金竹前辈您好,我是一名从事实时计算方向的IT工作者,最近在使用flink1.10 & >>> pyflink时遇到一点问题,希望能加下您的钉钉或者其他联系方式和您进一步交流,问题大概描述如下: >>> >>> 任务提交环境: >>> Apache-beam:2.15.0 >>> 本地python:2.7(已配置python3.7,通过修改~/.zshrc,alias >>> python='/usr/local/bin/python3.7') >>> pip:20.0.2 >>> flink:1.10 >>> >>> 提交命令:bin/flink run -pyarch tmp/venv.zip -pyexec >>> tmp/venv.zip/venv/bin/python3 -py word_count.py >>> >>> 在本地尝试以pre-job模式部署作业时,发现会提示如下报错,导致任务提交失败 >>> >>> RuntimeError: Python versions prior to 3.5 are not supported for PyFlink >>> [sys.version_info(major=2, minor=7, micro=16, releaselevel='final', >>> serial=0)]. >>> >>> >>> 显而易见,正如flink官方文档所说flink1.10作业必须要求python3.5+,我通过-pyarch >>> -pyexec来指定任务执行环境以及解释器环境,发现这两个指令貌似没生效,或者说没有作用,还是会有如上异常,具体执行过程都是参考您的文档: >>> https://enjoyment.cool/2020/01/02/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-PyFlink-%E4%BD%9C%E4%B8%9A%E7%9A%84%E5%A4%9A%E7%A7%8D%E9%83%A8%E7%BD%B2%E6%A8%A1%E5%BC%8F/#more >>> 来操作的,我在想可能还是我的打开方式不对,亦或该指令还存在隐藏问题?可是网上也没有太多的资料,所以希望能和前辈您交流交流,帮我解开这个疑惑,期待前辈您的回复。 >>> >>
Re: Flink 1.10 JSON 解析
Hi 张宇 看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。 开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800 *Best Regards,* *Zhenghua Gao* On Fri, Mar 20, 2020 at 5:28 PM 宇张 wrote: > hi, > 了解了,我重新整理一下: > streamTableEnv > .connect( > new Kafka() > .version("0.11") > .topic("mysql_binlog_test") > .startFromEarliest() > .property("zookeeper.connect", > "localhost:2181") > .property("bootstrap.servers", > "localhost:9092") > ) > .withFormat( > new Json() > ) > .withSchema( > new Schema() > .field("business", DataTypes.STRING()) > .field("data", DataTypes.ARRAY( > DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()), > DataTypes.FIELD("vendor_id", > DataTypes.DOUBLE()), > DataTypes.FIELD("status", > DataTypes.BIGINT()), > DataTypes.FIELD("create_time", > DataTypes.BIGINT()), > DataTypes.FIELD("tracking_number", > DataTypes.STRING()), > DataTypes.FIELD("invoice_no", > DataTypes.STRING()), > DataTypes.FIELD("parent_id", > DataTypes.BIGINT() > .field("database", DataTypes.STRING()) > .field("old", > DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status", > DataTypes.DECIMAL(38,18) > .field("table", DataTypes.STRING()) > .field("ts", DataTypes.BIGINT()) > .field("type", DataTypes.STRING()) > .field("putRowNum", DataTypes.BIGINT()) > ) > .createTemporaryTable("Test"); > 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的; > 异常: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type ARRAY> of table field 'old' > does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return > type. > at > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) > at > org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) > at > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) > at > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > at > >
Re: Re: Re: Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。
第一行错误信息是没有安装 bash ? xu1990xaut 于2020年3月26日周四 下午12:12写道: > 孙老师,我按照您视频里的方法把flink包安装好了。 但是运行您提供得demo时出现下面这个错误。 我在网上找了好久还是没解决。 > 望老师再指点指点。 > > > > > > 在 2020-03-25 15:47:49,"jincheng sun" 写道: > > 哦,PyFlink目前不支持windows。 > > Best, > Jincheng > - > Twitter: https://twitter.com/sunjincheng121 > - > > > xu1990xaut 于2020年3月25日周三 下午2:55写道: > >> 谢谢孙老师。 我用的就是这个示例。另外我看到python下又两个flink版本,一个是import flink,一个是import >> pyflink。 pyflink是不是不能在windows下运行? >> python下的flink我确定是安装正确的。 >> 运行flink是也启动了start-cluster.bat(start-clust.sh),但是pycharm控制台很久不出结果,cpu的占用率也正常。 >> 我实在不知道是哪里问题。 >> >> >> >> >> >> 在 2020-03-25 14:44:25,"jincheng sun" 写道: >> >> 上面视频中对应的word_count示例的源码应该是这个: >> https://github.com/sunjincheng121/enjoyment.code/blob/master/myPyFlink/enjoyment/word_count.py运行完成之后计算结果应该是写到sink_file >> = 'sink.csv'文件里面去了。你可以将这个文件的路径打印出来,查看这个文件内容。 >> >> 另外如果您只是为了学习入门的话,建议你查阅[1][2], 我让想整理了解PyFlink最新的状况,可以查看[3]。 >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/installation.html >> [2] >> https://enjoyment.cool/2020/01/22/Three-Min-Series-How-PyFlink-does-ETL/#more >> [3] >> https://www.bilibili.com/video/BV1W7411o7Tj?from=search=14518199503613218690 >> >> Best, >> Jincheng >> - >> Twitter: https://twitter.com/sunjincheng121 >> - >> >> >> xu1990xaut 于2020年3月25日周三 下午2:23写道: >> >>> 孙老师您好,我之前在网上看的是这个视频《【Apache Flink 进阶教程】14课. Apache Flink Python API >>> 的现状及未来规划》。 今天我也在虚拟机下试了,还是无法运行。 >>> 我用的是flink1.10,python3.6。 麻烦老师指点指点。 >>> >>> >>> >>> >>> >>> >>> 在 2020-03-25 11:32:29,"jincheng sun" 写道: >>> >>> 很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1], >>> 同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言! >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html >>> [2] https://enjoyment.cool/ >>> >>> Best, >>> Jincheng >>> >>> >>> >>> xu1990xaut 于2020年3月24日周二 下午11:36写道: >>> 您好,之前在哔哩哔哩上看过您讲的视频。 也跟着视频动手做了。 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。 请问这是什么原因。 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。 我是第一次接触flink,在网上也搜过这个问题, 可是一直没有得到答案。 麻烦您,给小弟指点指点, 谢谢您了。 >>> >>> >>> >>> >> >> >> >> > > > >
订阅用户邮件列表
你好! 我想订阅用户邮件列表,关注及解答用户问题,谢谢!!
NetworkBufferPool的使用
Hi: 观察flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments的值发现这个值很大,也就是说NetworkBufferPool还很充裕,可我的任务还是发生了背压告警。 请问各位大佬这是为什么呢?