Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-04 Thread Marco Villalobos
Is it possible to use OperatorState, when NOT implementing a source or sink function? If yes, then how?

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread Jacob
thanks, 我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/

Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. BTW, would we support read changelog for JDBC source when it works as right stream of a regular join in future? 1095193...@qq.com From: JING ZHANG Date: 2021-06-04 18:32 To: Yun Gao CC: 1095193...@qq.com; user Subject:

Elasticsearch sink connector timeout

2021-06-04 Thread Kai Fu
Hi team, We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. The error log is as below, we want

Re: Flink exported metrics scope configuration

2021-06-04 Thread Kai Fu
Hi Mason, Thank you for the advice, as I tried, it works and reduces a lot in size. On Fri, Jun 4, 2021 at 11:45 AM Mason Chen wrote: > Hi Kai, > > You can use the excluded variables config for the reporter. > >- metrics.reporter..scope.variables.excludes: (optional) A >semi-colon (;)

Re: Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-04 Thread Alexander Filipchik
Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing. To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb): T1: Job1 was running T2: Job1 was savepointed, brought down and replaced with Job2. T3:

Question about State TTL and Interval Join

2021-06-04 Thread McBride, Chris
We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM

退订

2021-06-04 Thread happiless
退订 发自我的iPhone

Re: Add control mode for flink

2021-06-04 Thread Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework that can bind into Flink operators. We probably don't need to let Flink runtime handle it. On Fri, Jun 4, 2021 at 8:11 AM Steven Wu wrote: > I am not sure if we should solve this problem in Flink. This is more like > a

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Thomas Wang
Hi Yun, Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented. Also could you suggest how I could use the "request-id" to get the savepoint location?

Re:Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread SmileSmile
Hi, after failover still OutOfOrderSequenceException. when I close checkpoint, kafka broker still return OutOfOrderSequenceException to me . At 2021-06-04 17:52:22, "Yun Gao" wrote: Hi, Have you checked if the error during normal execution, or right after failover? Best,

Re: Add control mode for flink

2021-06-04 Thread Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search:

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Thanks Timo  On Fri, Jun 4, 2021, 17:13 Timo Walther wrote: > Hi Yuval, > > I would recommend option 2. Because esp. when it comes to state you > should be in control what is persisted. There is no guarantee that the > ExternalSerializer will not change in the future. It is only meant for >

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther
Hi Yuval, I would recommend option 2. Because esp. when it comes to state you should be in control what is persisted. There is no guarantee that the ExternalSerializer will not change in the future. It is only meant for shipping data as the input of the next operator. I would recommend to

Add control mode for flink

2021-06-04 Thread 刘建刚
Hi everyone, Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following: 1. Change data processing’ logic, such as filter condition. 2. Send trigger events to make the

Re: Error with extracted type from custom partitioner key

2021-06-04 Thread Timo Walther
Hi Ken, non-POJOs are serialized with Kryo. This might not give you optimal performance. You can register a custom Kryo serializer in ExecutionConfig to speed up the serialization. Alternatively, you can implement `ResultTypeQueryable` provide a custom type information with a custom

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Hi Timo, Thank you for the response. The tables being created in reality are based on arbitrary SQL code such that I don't know what the schema actually is to create the TypeInformation "by hand" and pass it on to the DataStream API. This leaves me with option 1, which leads to another question:

Error with extracted type from custom partitioner key

2021-06-04 Thread Ken Krugler
Hi all, I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, with a DataSet) to do a better job of distributing data to tasks. The classes look like: public class MyPartitioner implements Partitioner { ... } public class MyGroupingKey implements Comparable {

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther
Hi Yuval, TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge between TypeInformation and DataType until TypeInformation is not exposed through the Table API anymore. Beginning from Flink 1.13 the Table API is able to serialize the records to the first DataStream operator

Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
When upgrading to Flink 1.13, I ran into deprecation warnings on TypeConversions [image: image.png] The deprecation message states that this API will be deprecated soon, but does not mention the alternatives that can be used for these transformations. My use case is that I have a table that

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all, Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception: Caused by: java.lang.NegativeArraySizeException: -2147183315 at

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
官方就有文档。其实本质就是一个异步操作假设1ms,那么同步操作的1s也就能1000个操作,qps太低了。异步的话可以大大提高qps。 Jacob <17691150...@163.com> 于2021年6月4日周五 下午5:58写道: > > 嗯嗯 你的描述是对的,job的执行过程大致就是如此 > > > 我明白你意思了 > > 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容? > > > > > > - > Thanks! > Jacob > -- > Sent from:

回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-04 Thread smq
非常感谢,我明天测试一下,解决之后,我会把这个问题描述下 -- 原始邮件 -- 发件人: r pp http://apache-flink.147419.n8.nabble.com/ -- Best, nbsp; pp -- Best, pp -- Best, pp

Re: Flink sql regular join not working as expect.

2021-06-04 Thread JING ZHANG
Hi, JDBC source only does a snapshot and sends all datas in the snapshot to downstream when it works as a right stream of a regular join, it could not produce a changlog stream. After you update the field 'target' from '56.32.15.55:8080' to ' 56.32.15.54:8080', JDBC source would not send new data

Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-04 Thread r pp
嗨~ 我这边是 per-job on yarn 的mode 我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env, 然后 形成config ,再启动 cluster。 而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。 所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示 ,但是改变路径,flink-web 就无法显示了 但是具体的差异可能无法细化了,所提供的信息太少 env 环境信息

Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread Yun Gao
Hi, Have you checked if the error during normal execution, or right after failover? Best, Yun -- From:SmileSmile Send Time:2021 Jun. 4 (Fri.) 11:07 To:user Subject:open checkpoint, send message to kafka

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Yun Gao
Hi Thomas, I think you are right that the CLI is also using the same rest API underlying, and since the response of the rest API is ok and the savepoint is triggered successfully, I reckon that it might not be due to rest API process, and we might still first focus on the stop-with-savepoint

Re: Flink sql regular join not working as expect.

2021-06-04 Thread Yun Gao
Hi, I'm not the expert for the table/sql, but it seems to me that for regular joins, Flink would not re-read the dimension table after it has read it fully for the first time. If you want to always join the records with the latest version of dimension table, you may need to use the temporal

????

2021-06-04 Thread ????????

Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Hi I am working on joining a Kafka stream with a Postgres Dimension table. Accoring to: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/ "Regular joins are the most generic type of join in which any new record, or changes to either side of the

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
我懂你意思,每个输入数据,经过redis、hbase等访问,以及相关调整(比如字段设置等),然后这个记录需要继续作为此算子的输出是吧。 我表达的是指你需要用异步访问redis、hbase方式,这个配合flink的异步算子去实现。所以你说的那个需求基于异步的是可以满足的。 Jacob <17691150...@163.com> 于2021年6月4日周五 下午3:21写道: > > @nobleyd 谢谢回复 > > 你任务A中的redis和hbase是异步还是同步访问,--- 同步 > > 你估计用的是对齐检查点是吧?

flink 1.11 application模式 使用 k8s时如何指定拉起的taskmanager数目

2021-06-04 Thread Jun Zou
Hi,all: 我使用flink 1.11.2 的application模式在k8s上提交作业,作业申请的tm数目和期望的不一致。 作业调用DataStream接口注册kafka source和HDFS sink,中间操作使用sql,sql逻辑是map-only,kafka的分区数目为4 首先,我在yarn上提交同样类型的作业,指定如下参数: > taskmanager.numberOfTaskSlots:1 > parallelism.default:4 产生了4个taskmanager 而在k8s上配置了如下参数: >

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Dian Fu
Hi Wouter, > E org.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class > org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class >

Re: Flink stream processing issue

2021-06-04 Thread yidan zhao
Yes, if you use KeyedCoProcess, flink will ensure that. Qihua Yang 于2021年6月4日周五 上午12:32写道: > > Sorry for the confusion Yes, I mean multiple parallelism. Really thanks > for your help. > > Thanks, > Qihua > > On Thu, Jun 3, 2021 at 12:03 AM JING ZHANG wrote: >> >> Hi Qihua, >> >> I’m sorry

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧? 如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。 最佳做法是,改造成异步的,不能同步。 JasonLee <17610775...@163.com> 于2021年6月4日周五 上午10:57写道: > > hi > > source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask > 空跑,浪费资源,你只需要把 map