Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s
,就会被丢弃。

楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl
就可以满足需求了。

BTW: watermark 我觉得很难使用好,实际使用场景非常有限。



Zhiwen Sun



On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang  wrote:

> > 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确
>
> 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
> 的数据频繁访问情况下,那么这个数据就不会过期。
>
> > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。
>
> 我记得日志是会打印相关的日志。能提一些相关的日志吗?
>
> best,
> Shengkai
>
> lxk  于2022年6月14日周二 20:04写道:
>
> > Hi,
> >   我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute
> > 目前来看数据量和使用inner join要差不多了。以下是代码
> > Table headerTable =
> > streamTableEnvironment.fromDataStream(headerFilterStream,
> >  Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream,
> > Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> >
> >
> > streamTableEnvironment.createTemporaryView("header",headerTable);
> > streamTableEnvironment.createTemporaryView("item",itemTable);
> > Table result = streamTableEnvironment.sqlQuery("select
> header.customer_id"
> > +
> > ",item.goods_id" +
> > ",header.id" +
> > ",header.order_status" +
> > ",header.shop_id" +
> > ",header.parent_order_id" +
> > ",header.order_at" +
> > ",header.pay_at" +
> > ",header.channel_id" +
> > ",header.root_order_id" +
> > ",item.id" +
> > ",item.row_num" +
> > ",item.p_sp_sub_amt" +
> > ",item.display_qty" +
> > ",item.qty" +
> > ",item.bom_type" +
> > " from item JOIN header on header.id = item.order_id and item.rowtime
> > BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime +
> INTERVAL
> > '4' MINUTE");
> >
> >   对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval
> > join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
> >   我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval
> > join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
> > Configuration conf = new Configuration();
> > conf.setString("table.exec.mini-batch.enabled","true");
> > conf.setString("table.exec.mini-batch.allow-latency","15 s");
> > conf.setString("table.exec.mini-batch.size","100");
> > conf.setString("table.exec.state.ttl","20 s");
> > env.configure(conf);
> > StreamTableEnvironment streamTableEnvironment =
> > StreamTableEnvironment.create(env,
> > EnvironmentSettings.fromConfiguration(conf));
> >
> >
> > 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2022-06-13 21:12:48,"Xuyang"  写道:
> > >Hi,
> > >  1、理论上来说inner join关联的数据量应该比interval
> > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
> > >  2、inner
> >
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
> >
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> > >
> > >
> > >如果我有不对的地方,请指正我哈。
> > >
> > >
> > >
> > >
> > >--
> > >
> > >Best!
> > >Xuyang
> > >
> > >
> > >
> > >
> > >
> > >在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
> > >>非常感谢回复
> > >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据
> > >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner
> >
> join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> > join应该也会受这样的影响
> > >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner
> > join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
> > >>
> > >>
> > >>
> > >>lxk7...@163.com
> > >>
> > >>发件人: Shengkai Fang
> > >>发送时间: 2022-06-11 20:35
> > >>收件人: user-zh
> > >>主题: Re: Re: Flink 使用interval join数据丢失疑问
> > >>hi,
> > >>
> > >>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
> > >>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是
> > `event
> > >>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
> > >>
> > >>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的
> event
> > >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
> > >>11:00之前的数据都可以被清理了。
> > >>
> > >>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
> > >>
> > >>best,
> > >>Shengkai
> > >>
> > >>lxk7...@163.com  于2022年6月10日周五 23:03写道:
> > >>
> > >>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
> > >>>
> > >>>
> >
> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
> > >>>
> > 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
> > >>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
> > >>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
> > >>> 3.使用流转表,sql interval 

Re: flink-connector-jdbc是否支持多个values问题

2022-06-14 Thread Zhiwen Sun
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true*

生成的 SQL 类似:

INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`,
> `order_name`, `total`)
>  VALUES
>   (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) ,
>   (2, '2022-06-14 22:31:21.496', 'OK', 'order-name-1', 131)
> ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`),
> `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`),
> `order_name`=VALUES(`order_name`), `total`=VALUES(`total`)



Zhiwen Sun



On Mon, Mar 7, 2022 at 5:07 PM 黑色  wrote:

> 你看一下底层的源码实现全知道了,它insert into x() values() ON duplicate
> Key实现Insert update,所以不会的
>
>
>
>
> --原始邮件--
> 发件人: "payne_z" 发送时间: 2022年3月7日(星期一) 下午3:49
> 收件人: "user-zh" 主题: flink-connector-jdbc是否支持多个values问题
>
>
>
> 请问flink-connector-jdbc是否支持同时写入多个values的用法?


Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread Shengkai Fang
> 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确

不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
的数据频繁访问情况下,那么这个数据就不会过期。

> 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。

我记得日志是会打印相关的日志。能提一些相关的日志吗?

best,
Shengkai

lxk  于2022年6月14日周二 20:04写道:

> Hi,
>   我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute
> 目前来看数据量和使用inner join要差不多了。以下是代码
> Table headerTable =
> streamTableEnvironment.fromDataStream(headerFilterStream,
>  Schema.newBuilder()
> .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> .build());
> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream,
> Schema.newBuilder()
> .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> .build());
>
>
> streamTableEnvironment.createTemporaryView("header",headerTable);
> streamTableEnvironment.createTemporaryView("item",itemTable);
> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
> +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from item JOIN header on header.id = item.order_id and item.rowtime
> BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + INTERVAL
> '4' MINUTE");
>
>   对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval
> join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
>   我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval
> join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15 s");
> conf.setString("table.exec.mini-batch.size","100");
> conf.setString("table.exec.state.ttl","20 s");
> env.configure(conf);
> StreamTableEnvironment streamTableEnvironment =
> StreamTableEnvironment.create(env,
> EnvironmentSettings.fromConfiguration(conf));
>
>
> 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-06-13 21:12:48,"Xuyang"  写道:
> >Hi,
> >  1、理论上来说inner join关联的数据量应该比interval
> join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
> >  2、inner
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> >
> >
> >如果我有不对的地方,请指正我哈。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >
> >在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
> >>非常感谢回复
> >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据
> >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner
> join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> join应该也会受这样的影响
> >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner
> join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
> >>
> >>
> >>
> >>lxk7...@163.com
> >>
> >>发件人: Shengkai Fang
> >>发送时间: 2022-06-11 20:35
> >>收件人: user-zh
> >>主题: Re: Re: Flink 使用interval join数据丢失疑问
> >>hi,
> >>
> >>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
> >>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是
> `event
> >>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
> >>
> >>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
> >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
> >>11:00之前的数据都可以被清理了。
> >>
> >>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
> >>
> >>best,
> >>Shengkai
> >>
> >>lxk7...@163.com  于2022年6月10日周五 23:03写道:
> >>
> >>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
> >>>
> >>>
> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
> >>>
> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
> >>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
> >>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
> >>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
> >>>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
> >>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
> >>>
> >>>
> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
> >>> join其实是一个window join吗?
> >>>
> >>>
> >>>
> >>> lxk7...@163.com
> >>>
> >>> 发件人: lxk
> >>> 发送时间: 2022-06-10 18:18
> >>> 收件人: user-zh
> >>> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
> >>>
> >>>
> >>>
> >>> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
> >>>
> >>>
> >>>
> >>>
> >>> Table headerTable =
> >>> 

Re: Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Hangxiang Yu
Hi, Jai.
Could you share your configuration about the checkpoint (interval,
min-pause, and so on)  and the checkpoint details in the Flink UI ?
I guess the delay of the checkpoint may be related to the last checkpoint
completion time as you could see in the
CheckpointRequestDecider#chooseRequestToExecute.
Maybe your checkpoint will last longer every 3rd or 4th checkpoints due to
the flush mechanism of rocksdb?

Best,
Hangxiang.

On Wed, Jun 15, 2022 at 6:27 AM Jai Patel 
wrote:

> We've noticed a spike in the start delays in our incremental checkpoints
> every 15 minutes.  The Flink job seems to start out smooth, with
> checkpoints in in the 15s range and negligible start delays.  Then every
> 3rd or 4th checkpoint has a long start delay (~2-3 minutes).  Teh
> checkpoints in between have negligible start delays and are fast.  So:
>
> 2-3 fast with negligible start delay, total time 15-30s
> 1-2 slow with 2-3 minute start delay, total time 15-30s longer than the
> start delay.
>
> What could cause this?  We have a couple output topics that are
> EXACTLY_ONCE, but I switched them to AT_LEAST_ONCE and continued to see the
> behavior.
>
> Thanks.
> Jai
>


关于PyFlink的开发环境问题

2022-06-14 Thread 张 兴博
您好:
   我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:

Traceback (most recent call last):
  File "/root/.py", line 6, in 
s_env = StreamExecutionEnvironment.get_execution_environment()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 805, in get_execution_environment
return StreamExecutionEnvironment(j_stream_exection_environment)
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 62, in __init__
self._open()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 973, in _open
startup_loopback_server()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 963, in startup_loopback_server
from pyflink.fn_execution.beam.beam_worker_pool_service import \
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
 line 31, in 
from apache_beam.options.pipeline_options import DebugOptions
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/__init__.py", line 
96, in 
from apache_beam import io
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/__init__.py", 
line 23, in 
from apache_beam.io.avroio import *
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/avroio.py", line 
63, in 
from apache_beam.io import filebasedsink
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/io/filebasedsink.py", line 
36, in 
from apache_beam.io import iobase
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/iobase.py", line 
57, in 
from apache_beam.transforms import Impulse
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/__init__.py", 
line 25, in 
from apache_beam.transforms.external import *
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/external.py", 
line 45, in 
from apache_beam.runners import pipeline_context
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/runners/pipeline_context.py",
 line 51, in 
from apache_beam.transforms import environments
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/environments.py",
 line 54, in 
from apache_beam.runners.portability.sdk_container_builder import 
SdkContainerImageBuilder
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/sdk_container_builder.py",
 line 44, in 
from apache_beam.internal.gcp.auth import get_service_credentials
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/internal/gcp/auth.py", line 
28, in 
from oauth2client.client import GoogleCredentials
  File "/usr/local/lib/python3.8/dist-packages/oauth2client/client.py", line 
39, in 
from oauth2client import transport
  File "/usr/local/lib/python3.8/dist-packages/oauth2client/transport.py", line 
17, in 
import httplib2
ModuleNotFoundError: No module named 'httplib2'

通过查询发现在python新版中,httplib2已经不用了?采用的名字是http.client?
我的python版本为3.8.10,jdk为openjdk 11.0.15(另一台为java 1.8)
我想知道这是什么原因造成的呢?怎么能解决这个问题呢?

感谢您在百忙之中解答我的问题,万分感谢~!

发送自 Windows 11 版邮件应用



Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Jai Patel
We've noticed a spike in the start delays in our incremental checkpoints
every 15 minutes.  The Flink job seems to start out smooth, with
checkpoints in in the 15s range and negligible start delays.  Then every
3rd or 4th checkpoint has a long start delay (~2-3 minutes).  Teh
checkpoints in between have negligible start delays and are fast.  So:

2-3 fast with negligible start delay, total time 15-30s
1-2 slow with 2-3 minute start delay, total time 15-30s longer than the
start delay.

What could cause this?  We have a couple output topics that are
EXACTLY_ONCE, but I switched them to AT_LEAST_ONCE and continued to see the
behavior.

Thanks.
Jai


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread Jing Ge
Hi Bastien,

Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
Flink in the master branch. Could you please point out the code that
committed offset is used as default?

W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no
committed offset, which is useful if the user is intended to read from the
committed offset but something is wrong. It might feel weird if it is used
as default, because an exception will be thrown when users start new jobs
with default settings.

Best regards,
Jing

On Tue, Jun 14, 2022 at 4:15 PM bastien dine  wrote:

> Hello everyone,
>
> Does someone know why the starting offset behaviour has changed in the new
> Kafka Source ?
>
> This is now from earliest (code in KafkaSourceBuilder), doc says :
> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
> be used by default." from :
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>
> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
> setStartFromGroupOffsets()
> method)
>
> which match with this behaviour in new KafkaSource :   :
> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>
> This change can lead to big troubles if user pay no attention to this
> point when migrating from old KafkaConsumer to new KafkaSource,
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: Apache Flink - Reading data from Scylla DB

2022-06-14 Thread Jing Ge
Hi,

Please be aware that SourceFunction will be deprecated soon[1]. It is
recommended to build a new source connector based on the new Source API
design by FLIP-27[2]. You might take the Kafka connector as the
reference implementation.

Best regards,
Jing

[1] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



On Tue, Jun 14, 2022 at 6:30 AM yuxia  wrote:

> Seems you may need implement a custom connector for Scylla DB as I haven't
> found a connector on hand.
> Hope the doc[1][2] can help you implement your own connector.
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
> [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Himanshu Sareen" 
> *收件人: *"User" 
> *发送时间: *星期二, 2022年 6 月 14日 上午 11:29:38
> *主题: *Apache Flink - Reading data from Scylla DB
>
> Team,
>
> I'm looking for a solution to Consume/Read data from Scylla DB into Apache
> Flink.
>
> If anyone can guide me or share pointers it will be helpful.
>
> Regards,
> Himanshu
>
>


Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Mike Barborak
Thank you for your replies.

Upgrading is in our plans but I think Yun is saying that might not help.

We are still trying to find what part of the savepoint is causing the error. We 
will try removing pieces of the job graph until we are able to savepoint.

From: Yun Tang 
Date: Tuesday, June 14, 2022 at 9:15 AM
To: Martijn Visser , Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint
Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] 
https://github.com/facebook/rocksdb/pull/3850
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245

Best
Yun Tang


From: Martijn Visser 
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] 
https://issues.apache.org/jira/browse/FLINK-9268

[2] 
https://www.mail-archive.com/user@flink.apache.org/msg34915.html



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)

at 

Re: Kafka Consumer commit error

2022-06-14 Thread Martijn Visser
Hi Christian,

There's another similar error reported by someone else. I've linked the
tickets together and asked one of the Kafka maintainers to have a look at
this.

Best regards,

Martijn

Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz <
christian.lor...@mapp.com>:

> Hi Alexander,
>
>
>
> I’ve created a Jira ticket here
> https://issues.apache.org/jira/browse/FLINK-28060.
>
> Unfortunately this is causing some issues to us.
>
> I hope with the attached demo project the root cause of this can also be
> determined, as this is reproducible in Flink 1.15.0, but not in Flink
> 1.14.4.
>
>
>
> Kind regards,
>
> Christian
>
>
>
> *Von: *Alexander Fedulov 
> *Datum: *Montag, 13. Juni 2022 um 23:42
> *An: *Christian Lorenz 
> *Cc: *"user@flink.apache.org" 
> *Betreff: *Re: Kafka Consumer commit error
>
>
>
> This email has reached Mapp via an external source
>
>
>
> Hi Christian,
>
>
>
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this
> application. Do you think this might still be related?
>
>
>
> No, in that case, Kafka transactions are not used, so it should not be
> relevant.
>
>
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz <
> christian.lor...@mapp.com> wrote:
>
> Hi Alexander,
>
>
>
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this
> application. Do you think this might still be related?
>
>
>
> Best regards,
>
> Christian
>
>
>
>
>
> *Von: *Alexander Fedulov 
> *Datum: *Montag, 13. Juni 2022 um 13:06
> *An: *"user@flink.apache.org" 
> *Cc: *Christian Lorenz 
> *Betreff: *Re: Kafka Consumer commit error
>
>
>
> This email has reached Mapp via an external source
>
>
>
> Hi Christian,
>
>
>
> you should check if the exceptions that you see after the broker is back
> from maintenance are the same as the ones you posted here. If you are using
> EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging
> transactions that Flink attempts to commit [1].
>
>
>
> Best,
>
> Alexander Fedulov
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
>
>
>
> On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser 
> wrote:
>
> Hi Christian,
>
>
>
> I would expect that after the broker comes back up and recovers
> completely, these error messages would disappear automagically. It should
> not require a restart (only time). Flink doesn't rely on Kafka's
> checkpointing mechanism for fault tolerance.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz <
> christian.lor...@mapp.com>:
>
> Hi,
>
>
>
> we have some issues with a job using the flink-sql-connector-kafka (flink
> 1.15.0/standalone cluster). If one broker e.g. is restarted for
> maintainance (replication-factor=2), the taskmanagers executing the job are
> constantly logging errors on each checkpoint creation:
>
>
>
> Failed to commit consumer offsets for checkpoint 50659
>
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
> Offset commit failed with a retriable exception. You should retry
> committing the latest consumed offsets.
>
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
>
>
>
> AFAICT the error itself is produced by the underlying kafka consumer.
> Unfortunately this error cannot be reproduced on our test system.
>
> From my understanding this error might occur once, but follow up
> checkpoints / kafka commits should be fine again.
>
> Currently my only way of “fixing” the issue is to restart the taskmanagers.
>
>
>
> Is there maybe some kafka consumer setting which would help to circumvent
> this?
>
>
>
> Kind regards,
>
> Christian
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank 

Re: Kafka Consumer commit error

2022-06-14 Thread Christian Lorenz
Hi Alexander,

I’ve created a Jira ticket here 
https://issues.apache.org/jira/browse/FLINK-28060.
Unfortunately this is causing some issues to us.
I hope with the attached demo project the root cause of this can also be 
determined, as this is reproducible in Flink 1.15.0, but not in Flink 1.14.4.

Kind regards,
Christian

Von: Alexander Fedulov 
Datum: Montag, 13. Juni 2022 um 23:42
An: Christian Lorenz 
Cc: "user@flink.apache.org" 
Betreff: Re: Kafka Consumer commit error

This email has reached Mapp via an external source

Hi Christian,

thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
application. Do you think this might still be related?

No, in that case, Kafka transactions are not used, so it should not be relevant.

Best,
Alexander Fedulov

On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz 
mailto:christian.lor...@mapp.com>> wrote:
Hi Alexander,

thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
application. Do you think this might still be related?

Best regards,
Christian


Von: Alexander Fedulov mailto:alexan...@ververica.com>>
Datum: Montag, 13. Juni 2022 um 13:06
An: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Cc: Christian Lorenz 
mailto:christian.lor...@mapp.com>>
Betreff: Re: Kafka Consumer commit error

This email has reached Mapp via an external source

Hi Christian,

you should check if the exceptions that you see after the broker is back from 
maintenance are the same as the ones you posted here. If you are using 
EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging 
transactions that Flink attempts to commit [1].

Best,
Alexander Fedulov

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance

On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:
Hi Christian,

I would expect that after the broker comes back up and recovers completely, 
these error messages would disappear automagically. It should not require a 
restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism for 
fault tolerance.

Best regards,

Martijn

Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz 
mailto:christian.lor...@mapp.com>>:
Hi,

we have some issues with a job using the flink-sql-connector-kafka (flink 
1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
(replication-factor=2), the taskmanagers executing the job are constantly 
logging errors on each checkpoint creation:

Failed to commit consumer offsets for checkpoint 50659
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
 Offset commit failed with a retriable exception. You should retry committing 
the latest consumed offsets.
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
 The coordinator is not available.

AFAICT the error itself is produced by the underlying kafka consumer. 
Unfortunately this error cannot be reproduced on our test system.
From my understanding this error might occur once, but follow up checkpoints / 
kafka commits should be fine again.
Currently my only way of “fixing” the issue is to restart the taskmanagers.

Is there maybe some kafka consumer setting which would help to circumvent this?

Kind regards,
Christian
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, 

Re: 怀疑源码中的一个方法是never reached code

2022-06-14 Thread Jing Ge
Hi,

友情提醒:开ticket以及以后在dev里讨论,记得用英语哈。

祝好
Jing


On Tue, Jun 14, 2022 at 3:23 PM Yun Tang  wrote:

> Hi,育锋
>
> 我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。
>
> 祝好
> 唐云
> 
> From: 朱育锋 
> Sent: Tuesday, June 14, 2022 19:33
> To: user-zh@flink.apache.org 
> Subject: 怀疑源码中的一个方法是never reached code
>
> Hello Everyone
>
>
> 在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:
>
> 1.
> 在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
> 2.
> false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
>
> 只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行
>
>
> 参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
>
> 我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面
>
> 也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
> >
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
> >
> [3]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
> >
> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
> >
> [5]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
> >
>
> Best regards
> YuFeng
>


New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread bastien dine
Hello everyone,

Does someone know why the starting offset behaviour has changed in the new
Kafka Source ?

This is now from earliest (code in KafkaSourceBuilder), doc says :
"If offsets initializer is not specified, OffsetsInitializer.earliest() will
be used by default." from :
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

Before in old FlinkKafkaConsumer it was from committed offset (i.e :
setStartFromGroupOffsets()
method)

which match with this behaviour in new KafkaSource :   : OffsetsInitializer.
committedOffsets(OffsetResetStrategy.EARLIEST

This change can lead to big troubles if user pay no attention to this point
when migrating from old KafkaConsumer to new KafkaSource,

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: Flink running same task on different Task Manager

2022-06-14 Thread Weihua Hu
Hi,

IMO, Broadcast is a better way to do this, which can reduce the QPS of
external access.
If you do not want to use Broadcast, Try using RichFunction, start a thread
in the open() method to refresh the data regularly. but be careful to clean
up your data and threads in the close() method, otherwise it will lead to
leaks.

Best,
Weihua


On Tue, Jun 14, 2022 at 12:04 AM Great Info  wrote:

> Hi,
> I have one flink job which has two tasks
> Task1- Source some static data over https and keep it in memory, this
> keeps refreshing it every 1 hour
> Task2- Process some real-time events from Kafka and uses static data to
> validate something and transform, then forward to other Kafka topic.
>
> so far, everything was running on the same Task manager(same node), but
> due to some recent scaling requirements need to enable partitioning on
> Task2 and that will make some partitions run on other task managers. but
> other task managers don't have the static data
>
> is there a way to run Task1 on all the task managers? I don't want to
> enable broadcasting since it is a little huge and also I can not persist
> data in DB due to data regulations.
>
>


Re: 怀疑源码中的一个方法是never reached code

2022-06-14 Thread Yun Tang
Hi,育锋

我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。

祝好
唐云

From: 朱育锋 
Sent: Tuesday, June 14, 2022 19:33
To: user-zh@flink.apache.org 
Subject: 怀疑源码中的一个方法是never reached code

Hello Everyone

在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:

1. 
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2. 
false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行

参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面

也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
 

[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
 

[3] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
 

[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
 

[5] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
 


Best regards
YuFeng


Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Yun Tang
Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] https://github.com/facebook/rocksdb/pull/3850
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245

Best
Yun Tang


From: Martijn Visser 
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] https://issues.apache.org/jira/browse/FLINK-9268

[2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)

... 3 more

Caused by: java.lang.NegativeArraySizeException: -785722504

at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)

at org.rocksdb.RocksIterator.value0(RocksIterator.java)

at org.rocksdb.RocksIterator.value(RocksIterator.java:50)

at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)

at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)


context.timestamp null in keyedprocess function

2022-06-14 Thread bat man
Hi,

We are using flink 12.1 on AWS EMR. The job reads the event stream and
enrich stream from another topic.
We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
timestamp from the event and handle idle source partitions.
AutoWatermarkInterval set to 5000L.
 The timestamp extractor looks like below -

@Override
public long extractTimestamp(Raw event, long
previousElementTimestamp) {
lastRecordProcessingTime = System.currentTimeMillis();
Double eventTime =

Double.parseDouble(event.getTimestamp().toString()).longValue();
long timestamp = Instant.ofEpochMilli(eventTime
*1_000).toEpochMilli();
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}

Second step the rules are joined to events, this is done in keyedprocess
function.
What we have observed is that at times when the job starts consuming from
the beginning of the event source stream, the timestamp accessed in
the keyedprocess fn using context.timestamp comes as null and the code is
throwing NPE.
This happens only for some records intermittently and the same event when
we try to process in another environment it processes fine, that means the
event is getting parsed fine.

What could be the issue, anyone has any idea, because as far as timestamp
goes it could only be null if the timestamp extractor sends null.

Thanks.


Re:Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread lxk
Hi,
  我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute 目前来看数据量和使用inner 
join要差不多了。以下是代码
Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream,   
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS 
TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
.build());
Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, 
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS 
TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
.build());


streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);
Table result = streamTableEnvironment.sqlQuery("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from item JOIN header on header.id = item.order_id and item.rowtime BETWEEN 
header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + INTERVAL '4' MINUTE");

  对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval 
join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
  我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval 
join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15 s");
conf.setString("table.exec.mini-batch.size","100");
conf.setString("table.exec.state.ttl","20 s");
env.configure(conf);
StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));


我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?





















在 2022-06-13 21:12:48,"Xuyang"  写道:
>Hi,
>  1、理论上来说inner join关联的数据量应该比interval 
> join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
>  2、inner 
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
>  
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
>
>
>如果我有不对的地方,请指正我哈。
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
>>非常感谢回复
>>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 
>>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner 
>>join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
>> join应该也会受这样的影响
>>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner 
>>join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
>>
>>
>>
>>lxk7...@163.com
>> 
>>发件人: Shengkai Fang
>>发送时间: 2022-06-11 20:35
>>收件人: user-zh
>>主题: Re: Re: Flink 使用interval join数据丢失疑问
>>hi,
>> 
>>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
>>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event
>>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
>> 
>>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
>>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
>>11:00之前的数据都可以被清理了。
>> 
>>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
>> 
>>best,
>>Shengkai
>> 
>>lxk7...@163.com  于2022年6月10日周五 23:03写道:
>> 
>>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
>>>
>>> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
>>> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
>>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
>>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
>>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
>>>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
>>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
>>>
>>> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
>>> join其实是一个window join吗?
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> 发件人: lxk
>>> 发送时间: 2022-06-10 18:18
>>> 收件人: user-zh
>>> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
>>>
>>>
>>>
>>> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
>>>
>>>
>>>
>>>
>>> Table headerTable =
>>> streamTableEnvironment.fromDataStream(headerFilterStream,
>>>  Schema.newBuilder()
>>> .columnByExpression("rowtime",
>>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>>> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
>>> .build());
>>> Table itemTable =
>>> streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
>>> .columnByExpression("rowtime",
>>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>>> .watermark("rowtime", 

Re: How to handle deletion of items using PyFlink SQL?

2022-06-14 Thread John Tipper
Yes, I’m interested in the best pattern to follow with SQL to allow for a 
downstream DB using the JDBC SQL connector to reflect the state of rows added 
and deleted upstream.

So imagine there is a crawl event at t=C1 that happens with an associated 
timestamp and which finds resources A,B,C. Is it better to emit one event into 
the stream with an array of all resources or many events, each with one 
resource and a corresponding crawl timestamp. There is obviously a limit to the 
amount of data that can be in a single event so the latter pattern will scale 
better for many resources.

Flink SQL sees this stream and processes it, then emits to a JDBC sink where 
there is one row for A, B, C.

Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to 
have 3 rows if possible and not have C. Alternatively it should have 4 rows 
with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any 
more.

I’m interested in a SQL solution if possible.

J

Sent from my iPhone

On 9 Jun 2022, at 11:20, Xuyang  wrote:



Hi, Dian Fu.

  I think John's requirement is like a cdc source that the source needs the 
ability to know which of datas should be deleted and then notify the framework, 
and that is why I recommendation John to use the UDTF.


And hi, John.
  I'm not sure this doc [1] is enough. BTW, I think you can also try to 
customize a connector[2] to send `DELETE` RowData to downstream by java and use 
it in PyFlink SQL, and maybe it's more easy.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions

[2] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks


--

Best!
Xuyang


在 2022-06-09 08:53:36,"Dian Fu"  写道:

Hi John,

If you are using Table API & SQL, the framework is handling the RowKind and 
it's transparent for you. So usually you don't need to handle RowKind in Table 
API & SQL.

Regards,
Dian

On Thu, Jun 9, 2022 at 6:56 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi Xuyang,

Thank you very much, I’ll experiment tomorrow. Do you happen to know whether 
there is a Python example of udtf() with a RowKind being set (or whether it’s 
supported)?

Many thanks,

John

Sent from my iPhone

On 8 Jun 2022, at 16:41, Xuyang mailto:xyzhong...@163.com>> 
wrote:


Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52



--

Best!
Xuyang


At 2022-06-08 20:06:17, "John Tipper" 
mailto:john_tip...@hotmail.com>> wrote:

Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


怀疑源码中的一个方法是never reached code

2022-06-14 Thread 朱育锋
Hello Everyone

在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:

1. 
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2. 
false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行

参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面

也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
 

[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
 

[3] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
 

[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
 

[5] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
 


Best regards
YuFeng

Re: Flink operator deletes the FlinkDeplyoment after a while

2022-06-14 Thread Gyula Fóra
Hi Sigalit,

This could be related to https://issues.apache.org/jira/browse/FLINK-27889
We have fixed this issue already (after the release), you could simply use
the latest operator image from of `release-1.0:


*ghcr.io/apache/flink-kubernetes-operator:cc8207c
*
In any case there probably was some error during the initial deployment of
your job, and the operator could not record the deployment information in
the CR status correctly.
This should not happen normally.

Cheers,
Gyula

On Tue, Jun 14, 2022 at 6:09 AM Sigalit Eliazov  wrote:

> after few hours of running job manager and task manager generated using
> the operator
> i get the following message in the operator log
> There really wasn't any traffic and the flink deployment is being delete
>
> === Finished metrics report
> ===
> Deleting FlinkDeployment
> 2022-06-14 03:09:51,847 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][ns/job-namel] Error during event processing ExecutionScope{
> resource id: CustomResourceID{name='job-name', namespace='ns'}, version:
> 53138} failed.
> java.lang.RuntimeException: Cannot create observe config before first
> deployment, this indicates a bug.
> at
> org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137)
> at
> org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50)
> at
> io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34)
>
> i'm not sure I understand this behviour.
> Thanks
> Sigalit
>