flink-1.11.2
./bin/start-cluster.sh 启动然后
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname
localhost --port
但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
2021-06-18 13:34:26,683 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
totorobabyfans
邮箱:totorobabyf...@163.com
签名由 网易邮箱大师 定制
hi,请教大家一个问题:
flink1.12.2 sql BlinkPlanner
使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
谢谢~
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.11.2 on a single host.
./bin/start-cluster.sh and then
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname
localhost --port
But on the jobmanager UI, the task is always in created state. There's
available slots.
Any insights on this?
Thanks,
Lei
好的,感谢Jark~
Jark Wu 于2021年6月18日周五 上午10:59写道:
> 可以看下 AsyncWaitOperator 的源码实现。
>
> Best,
> Jark
>
> On Tue, 15 Jun 2021 at 18:53, zilong xiao wrote:
>
> > 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
> >
> > Jingsong Li 于2021年6月15日周二 下午5:07写道:
> >
> > > 是有序的。
> > >
> > > 无序的mode目前并没有支持,
可以看下 AsyncWaitOperator 的源码实现。
Best,
Jark
On Tue, 15 Jun 2021 at 18:53, zilong xiao wrote:
> 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
>
> Jingsong Li 于2021年6月15日周二 下午5:07写道:
>
> > 是有序的。
> >
> > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 15, 2021 at
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jark
On Thu, 17 Jun 2021 at 09:29, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:
>
> 邮箱变更,退订!
>
>
>
>
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jark
On Tue, 15 Jun 2021 at 23:56, frank.liu wrote:
> 退订
>
>
> | |
> frank.liu
> |
> |
> frank...@163.com
> |
> 签名由网易邮箱大师定制
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。
Best,
Jark
On Thu, 17 Jun 2021 at 09:34, casel.chen wrote:
> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
>
Thanks for the report, Yidan.
It will be fixed in FLINK-23024 and hopefully fixed in 1.13.2.
Best,
Yangze Guo
On Fri, Jun 18, 2021 at 10:00 AM yidan zhao wrote:
>
> Yeah, I also think it is a bug.
>
> Arvid Heise 于2021年6月17日周四 下午10:13写道:
> >
> > Hi Yidan,
> >
> > could you check if the
你好,这个问题你解决了没 我现在也遇到同样的问题
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Yeah, I also think it is a bug.
Arvid Heise 于2021年6月17日周四 下午10:13写道:
>
> Hi Yidan,
>
> could you check if the bucket exist and is accessible? Seems like this
> directory cannot be created
> bos://flink-bucket/flink/ha/opera_upd_FlinkTestJob3/blob.
>
> The second issue looks like a bug. I will
Thanks Jing!
On Wed, Jun 16, 2021 at 11:30 PM JING ZHANG wrote:
> Hi Dan,
> It's better to split the Kafka partition into multiple partitions.
> Here is a way to try without splitting the Kafka partition. Add a
> rebalance shuffle between source and the downstream operators, set multiple
>
环境: FLINK 1.12 & CDH6.1.1
问题:
利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用到了正确的客户端jar包。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
the classpath, or some classes are missing
获取connector是否支持source和sink只要看 XXXDynamicTableFactory 是否实现
DynamicTableSourceFactory和DynamicTableSinkFactory接口,但在source情况下如何进一步判断它是否支持lookup呢?
public DynamicTableSource createDynamicTableSource(Context context)
Hello,
We are exploring running multiple Flink clusters within a Kubernetes cluster
such that each Flink cluster can run with a specified Flink image version.
Since the Flink Job Graph needs to be compatible with the Flink version running
in the Flink cluster, this brings a challenge in how we
Hi Arvid,
I see what you mean; no solution in Flink will be able to account for the
different variations in which applications may want to pass in parameters
or the external processes or events that introspect wherever the Flink
process happens to run. I do think there is an opportunity to
Thanks for sharing,
I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before
Adding
if
Hi,
I am using application mode.
Thanks,
Qihua
On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote:
> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I want is flink app has multiple jobs,
>>
Hi Qihua,
Which execution mode are you using?
On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote:
> Hi,
>
> Thank you for your reply. What I want is flink app has multiple jobs, each
> job manage a stream. Currently our flink app has only 1 job that manage
> multiple streams.
> I did try
Hi Jose,
Masking secrets is a recurring topic where ultimately you won't find a good
solution. Your secret might for example appear in a crash dump or on some
process monitoring application. To mask reliably you'd either need specific
application knowledge (every user supplies arguments
Hi,
Thank you for your reply. What I want is flink app has multiple jobs, each
job manage a stream. Currently our flink app has only 1 job that manage
multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when
the second executeAsync() was called, it shows " *Job
I need to bootstrap a keyed process function.
So, I was hoping to use the Table SQL API because I thought it could
parallelize the work more efficiently via partitioning.
I need to boot strap keyed state for a keyed process function, with
Flnk 1.12.1, thus I think I am required to use the DataSet
Sure, here it is. Nothing is mocked. I double-checked.
UnitTestClass {.
protected static LocalFlinkMiniCluster flink;
@BeforeClass
public static void prepare() {
flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
flink.start();
Hi,
Could you please share the test code?
I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.
I didn't know that I don't need to implement CheckpointedFunction if I use
ListState. However, I considered this answer (
https://stackoverflow.com/a/47443537/2096986) where Fabian says:
"You can store parts of the operator state also in the ListState (instead
of holding it on the heap) but it
用 standone 方式在一台机器上启动,提交job 后
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources.
slots 是充足的。
我用的是 flink-1.11.2 ,我看了下跟
Did some more digging.
1) is not an option as we are not doing any cleanups at the moment. We keep
the last 4 checkpoints per job + all the savepoints.
2) I looked at job deployments that happened 1 week before the incident. We
have 23 deployments in total and each resulted in a unique job id. I
Hi,
env.execute("Job 1"); is a blocking call. You either have to use
executeAsync or use a separate thread to submit the second job. If Job 1
finishes then this would also work by having sequential execution.
However, I think what you actually want to do is to use the same env with 2
topologies
Let's start in reverse: you don't need to implement CheckpointedFunction if
you use managed state (ListState is managed).
Now to the question of how you should implement onTimer. That's up to you
and heavily depends on your use case.
The first onTimer implementation is called 60s after an element
Yes, I have state on the ProcessFunction. I tested it on a stand-alone
cluster and it returns true when the application recovers. However, in
integration tests it does not returns true. I am using Flink 1.4. Do you
know where it is saying at Flink release 1.13 (
Thanks both for the suggestions, all good ideas. I will try some of the
profiling suggestions and report back.
On Thu, Jun 17, 2021 at 4:13 PM Yun Tang wrote:
> Hi Padarn,
>
> From my experiences, de-/serialization might not consume 3x CPU usage, and
> the background compaction could also
Does your ProcessFunction has state? If not it would be in line with the
documentation.
Also which Flink version are you using? Before Flink 1.13 empty state was
omitted so I could imagine that `isRestored()` would return false but it
should actually now also return true for empty state.
On Tue,
This could be a bug but I'd need to see more of the DataStream code to be
sure. Could you share that code?
On Sat, Jun 12, 2021 at 9:56 PM Ken Krugler
wrote:
> Hi Timo,
>
> Thanks, I’ll give the ResultTypeQueryable interface a try - my previous
> experience registering custom Kryo serializers
Hi Satish,
usually you would side-outputs [1] for that but afaik asyncIO doesn't
support that (yet).
So your option works well to use some union type. You can then chain a map
function that uses side-outputs.
[1]
Hi Nick,
This looks like a valid use case and shouldn't fail. The only workaround I
see is to create some dummy topics then.
On Fri, Jun 11, 2021 at 12:11 AM Martin, Nick J [US] (SP) <
nick.mar...@ngc.com> wrote:
> I’m trying to use the topic discovery feature of the Kafka Connector. The
>
Hi Yashwant,
I don't know Beam well, so you might also want to ask on their user list.
But I'll try to answer it from Flink's perspective.
If you want to work with Avro, you should use an AvroSerializer which
supports schema evolution in the best possible way.
PojoSerializer also allows small
hi
你可以先用 print 的 connector 把消费到的数据打印一下看是否乱码? 还是写入到 doris 后出现的乱码?
Best
JasonLee
在2021年6月17日 21:31,maker_d...@foxmail.com 写道:
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。
SQL如下:
CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。
SQL如下:
CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'kafka',
'topic' = 'datacollect_business_stage',
Hi Thomas. The bug https://issues.apache.org/jira/browse/FLINK-21028 is
still present in 1.12.1. You would need to upgrade to at least 1.13.0,
1.12.2 or 1.11.4. However as I mentioned before, 1.11.4 hasn't yet been
released. On the other hand both 1.12.2 and 1.13.0 have already been
superseded by
Hi Leonard Xu,
The version is 1.13. Is it a bug? I noticed that the type of column `b` is
integer, but I use it as varchar.
What the expected action should it be ?
At 2021-06-17 20:11:24, "Leonard Xu" wrote:
Hi, houying
It looks like a bug when code generate the operator
我仔细想了想,我的集群是内网服务器上的容器,容器之间访问应该不算经过NAT。
当然和网络相关的监控来看,的确很多机器的time-wait状态的连接不少,在5w+个左右,但也不至于导致这个问题感觉。
东东 于2021年6月17日周四 下午2:48写道:
>
> 这俩都开启的话,就要求同一源ip的连接请求中的timstamp必须是递增的,否则(非递增)的连接请求被视为无效,数据包会被抛弃,给client端的感觉就是时不时的连接超时。
>
>
>
>
Hi, houying
It looks like a bug when code generate the operator code, which Flink version
are you using?
Could you help create an JIRA ticket?
Best,
Leonard
> 在 2021年6月17日,19:48,纳兰清风 写道:
>
> Hello,
>
> When I am using case when statement in flink sql, I got an error as
> follow:
>
Hello,
When I am using case when statement in flink sql, I got an error as follow:
org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common
type of GeneratedExpression(field$3,isNull$3,,INT,None) and
Hi community,
I have implemented a join function using CoProcessFunction with
CheckpointedFunction to recover from failures. I added some debug lines to
check if it is restoring and it does. Before the crash, I process events
that fall at processElement2. I create snapshots at snapshotState(),
Hi Robert,
Thank you for your enthusiastic answer
I have understood the current problem and look forward to a good solution and
optimization by the community. I will continue to pay attention to changes in
the community.
Best,
Jason
| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制
FLINK版本: FLINK-1.12
HADOOP环境: CDH6.1.1
利用yarn-per-job模式提交失败,看堆栈应该是在初始化hdfs连接的时候出错,不过看起来应该是使用了正确的hdfs-client包,不知道为什么还是会有returnType的问题?
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
the
退订
--
发件人:Jingsong Li
发送时间:2021年6月17日(星期四) 15:37
收件人:user-zh
主 题:Re: 退訂
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jingsong
On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau
wrote:
> 郵箱更換,退訂
>
>
Hello,
Currently, I am using confluent Kafka Avro serializer to write to Kafka,
and in the meanwhile register the schema to confluent schema registry.
The problem here is that our upstream is deserialized from msgpack and
converted to a hashmap, which is not serializable
for avro. The map
Hello,
While trying to use the Pyflink DataStream API in Flink 1.13, I have
encountered an error regarding list types. I am trying to read data from a
Kafka topic that contains events in a json format. For example:
{
"timestamp": 1614259940,
"harvesterID": "aws-harvester",
"clientID":
哈喽,各位专家/大神:
为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。
谢谢
Hi,
你可以参照社区的 state-evolution的 E2E 测试代码 [1], 整个程序就是使用的avro作为相关类的声明工具。
[1]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-state-evolution-test/src/main
祝好
唐云
From: casel.chen
Sent: Friday, June 11, 2021 8:13
To:
Hi Padarn,
>From my experiences, de-/serialization might not consume 3x CPU usage, and the
>background compaction could also increase the CPU usage. You could use
>async-profiler [1] to figure out what really consumed your CPU usage as it
>could also detect the native RocksDB thread stack.
Hi Marco,
which operations do you want to execute in the bootstrap pipeline?
Maybe you don't need to use SQL and old planner. At least this would
simplify the friction by going through another API layer.
The JDBC connector can be directly be used in DataSet API as well.
Regards,
Timo
On
Hi Jason,
I hope you don't mind that I brought back the conversation to the user@
mailing list, so that others can benefit from the information as well.
Thanks a lot for sharing your use case. I personally believe that Flink
should support invocations like "flink run -m yarn-cluster
Hi, all
使用python写的udf,里面封装了模型的预测,但是在提交sqljob到flink session的时候,总是被容器kill。
taskmanager 命令行参数:
sun.java.command = org.apache.flink.runtime.taskexecutor.TaskManagerRunner
-Djobmanager.rpc.address=10.50.56.253 --configDir /opt/flink-1.11.2/conf -D
taskmanager.memory.framework.off-heap.size=134217728b
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jingsong
On Thu, Jun 17, 2021 at 2:16 PM 金晓龙 wrote:
> 退订
--
Best, Jingsong Lee
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jingsong
On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau
wrote:
> 郵箱更換,退訂
>
> Regards,
> Hau ChongAih
>
--
Best, Jingsong Lee
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org
Best,
Jingsong
On Thu, Jun 17, 2021 at 9:29 AM wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:
>
> 邮箱变更,退订!
>
>
>
>
--
Best, Jingsong Lee
请问该问题有解决吗?我使用FLINK yarn-per-job方式提交到yarn集群也出现了这个错误
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Chirag,
Which Flink version are you using? As far as I understand, the issue is
appearing just by writing the initial data - no recovery happened right?
Could you try to change the code such that you only have a single
read/update on the state? It should work as you have done it but I'd like
本来想从DeserializationFormat 拿到的,如果不能。后续SQL 能拿到也行
在 2021-06-17 14:41:55,"Jingsong Li" 写道:
>不能,除非你自己创建一个新的kafka connector。
>
>不过,
>kafka的offset、partition等信息是可以通过metadata的方式拿到的。
>
>你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?
>
>Best,
>Jingsong
>
>On Thu, Jun 17, 2021 at 2:35 PM
这俩都开启的话,就要求同一源ip的连接请求中的timstamp必须是递增的,否则(非递增)的连接请求被视为无效,数据包会被抛弃,给client端的感觉就是时不时的连接超时。
一般来说单机不会有这个问题,因为时钟应该是一个,在NAT后面才容易出现这个现象(因为多个主机时钟通常不完全一致),但不清楚你的具体架构,只能说试一试。
最后,可以跟运维讨论一下,除非确信不会有经过NAT过来的链接,否则这俩最好别都开。
PS: kernel 4.1里面已经把 tcp_tw_reuse 这玩意废掉了,因为太多人掉这坑里了
在 2021-06-17 14:07:50,"yidan
不能,除非你自己创建一个新的kafka connector。
不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。
你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?
Best,
Jingsong
On Thu, Jun 17, 2021 at 2:35 PM Michael Ran wrote:
> dear all :
> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
dear all :
目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
但是根据 “implements DeserializationFormatFactory,
SerializationFormatFactory”
这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
有方法
Hi Dan,
It's better to split the Kafka partition into multiple partitions.
Here is a way to try without splitting the Kafka partition. Add a rebalance
shuffle between source and the downstream operators, set multiple
parallelism for the downstream operators. But this way would introduce
extra cpu
Hi,
since your state (150gb) seems to fit into memory (700gb), I would
recommend trying the HashMapStateBackend:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend
(unless you know that your state size is going to increase a lot
退订
If you are able to execute your job locally as well (with enough data), you
can also run it with a profiler and see the CPU cycles spent on
serialization (you can also use RocksDB locally)
On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson wrote:
> Thanks Robert. I think it would be easy enough to
这啥原理,这个改动我没办法直接改,需要申请。
东东 于2021年6月17日周四 下午1:36写道:
>
>
>
> 把其中一个改成0
>
>
> 在 2021-06-17 13:11:01,"yidan zhao" 写道:
> >是的,宿主机IP。
> >
> >net.ipv4.tcp_tw_reuse = 1
> >net.ipv4.tcp_timestamps = 1
> >
> >东东 于2021年6月17日周四 下午12:52写道:
> >>
> >> 10.35.215.18是宿主机IP?
> >>
> >> 看一下
Thanks Yun,
Agreed, it seemed unlikely to be state, I just wanted to confirm that this
was unexpected before ruling it out.
Thanks,
Padarn
On Thu, Jun 17, 2021 at 10:45 AM Yun Gao wrote:
> Hi Padarn,
>
> From the current description it seems to me that the issue does not
> related to
> the
72 matches
Mail list logo