Re: CleanUpInRocksDbCompactFilter

2023-06-15 Thread Hangxiang Yu
Hi, Patricia.
In my opinion, This parameter balances the trade-off between the read/write
performance and storage space utilization (of course, smaller state also
means better performance for the future).

I think the right value of longtimeNumberOfQueries depends on several
factors, such as the size of your state, the rate of updates to your data.

So just a personal suggestion:

1. If your state size is relatively small and does not update frequently,
you may be able to use a larger value.

2. otherwise, you could use a smaller value for longTimeNumberOfQueries()
to ensure that the compaction filter is executed more frequently.

3. Of course, You could also start with the default value and increase or
decrease it gradually to see whether the performance and storage space
works well for your job.

On Fri, Jun 16, 2023 at 8:29 AM patricia lee  wrote:

> Hi,
>
> I am currently migrating our flink project from 1.8 to 1.17.
>
> The cleanUpInRocksDbCompactFilter() now accepts longtimeNumberOfQueries()
> as parameter. The question is how would we know the right value. We set to
> 1000 temporarily, is there a default value to set.
>
>
> Regards,
> Patricia
>


-- 
Best,
Hangxiang.


Re: Store a state at a RDBMS before TTL passes by

2023-06-15 Thread Shammon FY
Hi Anastasios,

What you want sounds like a session window [1], maybe you can refer to the
doc for more details.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#session-windows

Best,
Shammon FY

On Thu, Jun 15, 2023 at 10:03 PM Anastasios Makris <
anastasios.makris...@gmail.com> wrote:

> Hi Flink users,
>
> I created a KeyedStream that tracks for each user of my website some
> metrics. It's time a user produces an event the metrics are recomputed and
> change.
> I would like to keep the outcome of a user's session at an RDBMS, which
> will be a single row.
>
> The first and obvious solution would be to Insert the row at the RDBMS and
> then update it, it's time something new occurs.
>
> I would like to ask if another solution is possible.
> For example, could we maybe schedule to use the RDBMS as a sink of the
> state before its TTL passes by?
>
> Best regards,
> Anastasis
>


CleanUpInRocksDbCompactFilter

2023-06-15 Thread patricia lee
Hi,

I am currently migrating our flink project from 1.8 to 1.17.

The cleanUpInRocksDbCompactFilter() now accepts longtimeNumberOfQueries()
as parameter. The question is how would we know the right value. We set to
1000 temporarily, is there a default value to set.


Regards,
Patricia


Re: Watermark idleness and alignment - are they exclusive?

2023-06-15 Thread Ken Krugler
I think you’re hitting this issue:

https://issues.apache.org/jira/browse/FLINK-31632 


Fixed in 1.16.2, 1.171.

— Ken


> On Jun 15, 2023, at 1:39 PM, Piotr Domagalski  wrote:
> 
> Hi all!
> 
> We've been experimenting with watermark alignment in Flink 1.15 and observed 
> an odd behaviour that I couldn't find any mention of in the documentation.
> 
> With the following strategy:
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((e, t) -> e.timestamp)
> .withIdleness(Duration.ofSeconds(3600))
> .withWatermarkAlignment("group-1", Duration.ofSeconds(15));
> 
> Kafka sources stop consuming completely after 3600s (even when the data is 
> flowing into all the partitions). Is this an expected behaviour? Where could 
> I find more information on this?
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Watermark idleness and alignment - are they exclusive?

2023-06-15 Thread Piotr Domagalski
Hi all!

We've been experimenting with watermark alignment in Flink 1.15 and
observed an odd behaviour that I couldn't find any mention of in the
documentation.

With the following strategy:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((e, t) -> e.timestamp)
.withIdleness(Duration.ofSeconds(3600))
.withWatermarkAlignment("group-1", Duration.ofSeconds(15));

Kafka sources stop consuming completely after 3600s (even when the data is
flowing into all the partitions). Is this an expected behaviour? Where
could I find more information on this?

-- 
Piotr Domagalski


回复:(无主题)

2023-06-15 Thread 海风
多谢多谢



 回复的原邮件 
| 发件人 | Weihua Hu |
| 日期 | 2023年06月14日 12:32 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: (无主题) |
>
> 这个状态变量是否需要用transient来修饰

ValueState 再 Rich fuction 的 open 方法中被初始化,不应该被序列化和反序列化,建议使用 transient 来修饰。
但实际上自定义函数的序列化、反序列化只在任务部署阶段执行,而且初始状态下 ValueState 的值是 null,所以不使用 transient
关键字也不会有太大的影响。

以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰

理解自定义函数的序列化、反序列化是在任务部署阶段执行之后,这个问题就比较好回答了。 如果你的变量在是函数的 open 方法内初始化的,那应该增加
transient 关键字来表明该字段不需要参与序列化


Best,
Weihua


On Tue, Jun 13, 2023 at 1:10 PM Paul <18751805...@163.com> wrote:

> 在flink处理函数中定义一个状态变量,比如private ValueState
> vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教
>
>
>


Store a state at a RDBMS before TTL passes by

2023-06-15 Thread Anastasios Makris
Hi Flink users,

I created a KeyedStream that tracks for each user of my website some
metrics. It's time a user produces an event the metrics are recomputed and
change.
I would like to keep the outcome of a user's session at an RDBMS, which
will be a single row.

The first and obvious solution would be to Insert the row at the RDBMS and
then update it, it's time something new occurs.

I would like to ask if another solution is possible.
For example, could we maybe schedule to use the RDBMS as a sink of the
state before its TTL passes by?

Best regards,
Anastasis


Re: AsyncFunction vs Async Sink

2023-06-15 Thread Teoh, Hong
Hi Lu,

> 1. Is there any problem if we use Async Function for such a user case? We can 
> simply drop the output and use Unordered mode.


As far as I can tell, it is similar, other than the retry strategy available 
for AsyncFunctions and batching for Async Sink. Both should work on Flink.


> 2. For AsyncFunction and  Async Sink. does it make sense that both could 
> share the same underlying implementation and the features like batching and 
> rate limiting can benefit both?

Good question - I think there are quite a lot of similarities, that’s why the 
interface is similar. However, I think the end use-case is different. For 
example, AsyncSink might want to implement support for some form of 
2phase-commit on Sink (at least once guarantee). This is slightly more 
complicated on AsyncFunction.



Regards,
Hong



On 15 Jun 2023, at 00:26, Lu Niu  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks, Hong!

I understand that if the user case is to simply write sth to an external 
service, Async Sink is a good option that provides features like batching, 
state management and rate limiting. I have some follow up questions:

1. Is there any problem if we use Async Function for such a user case? We can 
simply drop the output and use Unordered mode.
2. For AsyncFunction and  Async Sink. does it make sense that both could share 
the same underlying implementation and the features like batching and rate 
limiting can benefit both?

Best
Lu


On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong 
mailto:lian...@amazon.co.uk>> wrote:
Hi Lu,

Thanks for your question. See below for my understanding.

I would recommend using the Async Sink if you are writing to the external 
service as the final output of your job graph, and if you don’t have the 
ordered requirement that updates to the external system must be done before 
updates to some other external system within the same job graph. (More 
explained later).

The abstraction of the Async Sink is a sink, meaning it is a terminal operator 
in the job graph. The abstraction is intended to simplify the writing of a sink 
- meaning the base implementation will handle batching, state management and 
rate limiting. You only need to provide the client and request structure to be 
used to interact with the external service. This makes writing and maintaining 
the sink easier (if you simply want to write to a destination with at least 
once processing).

The AsyncFunction, as I understand it is more used for data enrichment, and is 
not a terminal operator in the job graph. This means the return value from the 
external service will continue to be passed on down the Flink Job graph. This 
is useful for data enrichment using the external service, or if we want to 
ensure the system being called in the AsyncFunction is updated BEFORE any data 
is written to the sinks further down the job graph.

For example:

Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink

We can be sure that the updates to DynamoDB for a particular record happens 
before the record is written to the Kinesis Sink.


Hope the above clarifies your question!

Regards,
Hong


On 14 Jun 2023, at 19:27, Lu Niu mailto:qqib...@gmail.com>> 
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi, Flink dev and users

If I want to async write to an external service, which API shall I use, 
AsyncFunction or Async Sink?

My understanding after checking the code are:

  1.  Both APIs guarantee at least once write to external service. As both API 
internally stores in-flight requests in the checkpoint.
  2.  Async Sink provides a batching request feature. This can be implemented 
with Map + AsyncFunction. Map function groups requests in batches and pass it 
to AsyncFunction.The batching implementation can refer to 
AbstractMapBundleOperator if don’t want to use state.
  3.  Async Sink supports retry on failed requests. AsyncFunction also supports 
retry in latest flink version.
  4.  Async Sink supports rate limiting, AsyncFunction doesn’t.
  5.  AsyncFunction can be used to implement read-update-write. Async Sink 
cannot.

Best

Lu




Re: Interaction between idling sources and watermark alignment

2023-06-15 Thread Teoh, Hong
Hi Alexis, below is my understanding:


> I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
> within a single source's splits and across different sources. I don't see 
> this explicitly mentioned in the documentation, but I assume that the concept 
> of "maximal drift" used for alignment also takes idleness into account, 
> resuming any readers that were paused due to an idle split or source. Is my 
> understanding correct?

As far as I understand, the evaluation to “unpause” a given split that might 
have been paused due to watermark alignment is evaluated at fixed intervals 
here. [1]

We see that the SourceCoordinator calls announceCombinedWatermark() that 
calculates the global watermark and that subsequently sends a 
WatermarkAlignmentEvent to each subtask. On each subtask, there is an 
evaluation of whether to “wake up” the operator. [2] [3]

This means that there is a periodic evaluation of whether to “wake up”, 
controlled by the update interval, which defaults to 1s [4]

> Also, something that isn't 100% clear to me when comparing to the previous 
> watermark alignment documentation, even if I only wanted alignment within a 
> single source's splits, I still need to call withWatermarkAlignment in the 
> watermark strategy, right? Otherwise alignment will not take place, 
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Yes, this is correct. Watermark groups are used to check whether multiple 
sources need to coordinate watermarks. If two sources A and B both belong to 
the same watermark group, then their watermarks will be aligned.

Hope the above helps.


Cheers,
Hong


[1] 
https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
[3] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
[4] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29



On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa  
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi again, I'm not a fan of bumping questions, but I think this might be 
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, 
mailto:sarda.espin...@gmail.com>> wrote:
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
within a single source's splits and across different sources. I don't see this 
explicitly mentioned in the documentation, but I assume that the concept of 
"maximal drift" used for alignment also takes idleness into account, resuming 
any readers that were paused due to an idle split or source. Is my 
understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous 
watermark alignment documentation, even if I only wanted alignment within a 
single source's splits, I still need to call withWatermarkAlignment in the 
watermark strategy, right? Otherwise alignment will not take place, regardless 
of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.




Re: [DISCUSS] Status of Statefun Project

2023-06-15 Thread Martijn Visser
Let me know if you have a PR for a Flink update :)

On Thu, Jun 8, 2023 at 5:52 PM Galen Warren via user 
wrote:

> Thanks Martijn.
>
> Personally, I'm already using a local fork of Statefun that is compatible
> with Flink 1.16.x, so I wouldn't have any need for a released version
> compatible with 1.15.x. I'd be happy to do the PRs to modify Statefun to
> work with new versions of Flink as they come along.
>
> As for testing, Statefun does have unit tests and Gordon also sent me
> instructions a while back for how to do some additional smoke tests which
> are pretty straightforward. Perhaps he could weigh in on whether the
> combination of automated tests plus those smoke tests should be sufficient
> for testing with new Flink versions (I believe the answer is yes).
>
> -- Galen
>
>
>
> On Thu, Jun 8, 2023 at 8:01 AM Martijn Visser 
> wrote:
>
>> Hi all,
>>
>> Apologies for the late reply.
>>
>> I'm willing to help out with merging requests in Statefun to keep them
>> compatible with new Flink releases and create new releases. I do think
>> that
>> validation of the functionality of these releases depends a lot on those
>> who do these compatibility updates, with PMC members helping out with the
>> formal process.
>>
>> > Why can't the Apache Software Foundation allow community members to
>> bring
>> it up to date?
>>
>> There's nothing preventing anyone from reviewing any of the current PRs or
>> opening new ones. However, none of them are approved [1], so there's also
>> nothing to merge.
>>
>> > I believe that there are people and companies on this mailing list
>> interested in supporting Apache Flink Stateful Functions.
>>
>> If so, then now is the time to show.
>>
>> Would there be a preference to create a release with Galen's merged
>> compatibility update to Flink 1.15.2, or do we want to skip that and go
>> straight to a newer version?
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved
>>
>> On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos <
>> mvillalo...@kineteque.com>
>> wrote:
>>
>> > Why can't the Apache Software Foundation allow community members to
>> bring
>> > it up to date?
>> >
>> > What's the process for that?
>> >
>> > I believe that there are people and companies on this mailing list
>> > interested in supporting Apache Flink Stateful Functions.
>> >
>> > You already had two people on this thread express interest.
>> >
>> > At the very least, we could keep the library versions up to date.
>> >
>> > There are only a small list of new features that might be worthwhile:
>> >
>> > 1. event time processing
>> > 2. state rest api
>> >
>> >
>> > On Jun 6, 2023, at 3:06 AM, Chesnay Schepler 
>> wrote:
>> >
>> > If you were to fork it *and want to redistribute it* then the short
>> > version is that
>> >
>> >1. you have to adhere to the Apache licensing requirements
>> >2. you have to make it clear that your fork does not belong to the
>> >Apache Flink project. (Trademarks and all that)
>> >
>> > Neither should be significant hurdles (there should also be plenty of
>> > online resources regarding 1), and if you do this then you can freely
>> share
>> > your fork with others.
>> >
>> > I've also pinged Martijn to take a look at this thread.
>> > To my knowledge the project hasn't decided anything yet.
>> >
>> > On 27/05/2023 04:05, Galen Warren wrote:
>> >
>> > Ok, I get it. No interest.
>> >
>> > If this project is being abandoned, I guess I'll work with my own fork.
>> Is
>> > there anything I should consider here? Can I share it with other people
>> who
>> > use this project?
>> >
>> > On Tue, May 16, 2023 at 10:50 AM Galen Warren 
>> 
>> > wrote:
>> >
>> >
>> > Hi Martijn, since you opened this discussion thread, I'm curious what
>> your
>> > thoughts are in light of the responses? Thanks.
>> >
>> > On Wed, Apr 19, 2023 at 1:21 PM Galen Warren 
>> 
>> > wrote:
>> >
>> >
>> > I use Apache Flink for stream processing, and StateFun as a hand-off
>> >
>> > point for the rest of the application.
>> > It serves well as a bridge between a Flink Streaming job and
>> > micro-services.
>> >
>> > This is essentially how I use it as well, and I would also be sad to see
>> > it sunsetted. It works well; I don't know that there is a lot of new
>> > development required, but if there are no new Statefun releases, then
>> > Statefun can only be used with older Flink versions.
>> >
>> > On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>> >
>> >
>> > I am currently using Stateful Functions in my application.
>> >
>> > I use Apache Flink for stream processing, and StateFun as a hand-off
>> > point for the rest of the application.
>> > It serves well as a bridge between a Flink Streaming job and
>> > micro-services.
>> >
>> > I would be disappointed if StateFun was sunsetted.  Its a good idea.
>> >
>> > If there is anything I can do to help, as a contributor perhaps, please

Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 Thread daniel sun
退订

On Thu, Jun 15, 2023 at 7:23 PM im huzi  wrote:

> 退订
>
> On Tue, Jun 13, 2023 at 08:51 casel.chen  wrote:
>
> > 线上跑了200多个flink
> >
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> > flink
> >
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> > 请问这个问题有什么好的办法解决吗?
>


Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 Thread im huzi
退订

On Tue, Jun 13, 2023 at 08:51 casel.chen  wrote:

> 线上跑了200多个flink
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> flink
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> 请问这个问题有什么好的办法解决吗?


Re:Re: flink写kafka 事务问题

2023-06-15 Thread xuguang
是这个原因,学习了,感谢!

















在 2023-06-15 16:25:30,"yuanfeng hu"  写道:
>消费者要设置事务隔离级别
>
>> 2023年6月15日 16:23,163  写道:
>> 
>> 据我了解,kafka支持事务,开启checkpoint及exactly-once后仅当checkpoint执行完毕后才能将数据写入kafka中。测试:flink读取kafka的topic
>>  a写入topic b,开启checkpoint及exactly-once,flink未执行完新一次的checkpoint,但topic 
>> b已经可以消费到新数据,这是什么原因?请大家指教!
>


Re: flink写kafka 事务问题

2023-06-15 Thread yuanfeng hu
消费者要设置事务隔离级别

> 2023年6月15日 16:23,163  写道:
> 
> 据我了解,kafka支持事务,开启checkpoint及exactly-once后仅当checkpoint执行完毕后才能将数据写入kafka中。测试:flink读取kafka的topic
>  a写入topic b,开启checkpoint及exactly-once,flink未执行完新一次的checkpoint,但topic 
> b已经可以消费到新数据,这是什么原因?请大家指教!



flink写kafka 事务问题

2023-06-15 Thread 163
据我了解,kafka支持事务,开启checkpoint及exactly-once后仅当checkpoint执行完毕后才能将数据写入kafka中。测试:flink读取kafka的topic
 a写入topic b,开启checkpoint及exactly-once,flink未执行完新一次的checkpoint,但topic 
b已经可以消费到新数据,这是什么原因?请大家指教!