Participate in the ASF 25th Anniversary Campaign

2024-04-03 Thread Brian Proffitt
Hi everyone,

As part of The ASF’s 25th anniversary campaign[1], we will be celebrating
projects and communities in multiple ways.

We invite all projects and contributors to participate in the following
ways:

* Individuals - submit your first contribution:
https://news.apache.org/foundation/entry/the-asf-launches-firstasfcontribution-campaign
* Projects - share your public good story:
https://docs.google.com/forms/d/1vuN-tUnBwpTgOE5xj3Z5AG1hsOoDNLBmGIqQHwQT6k8/viewform?edit_requested=true
* Projects - submit a project spotlight for the blog:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278466116
* Projects - contact the Voice of Apache podcast (formerly Feathercast) to
be featured: https://feathercast.apache.org/help/
*  Projects - use the 25th anniversary template and the #ASF25Years hashtag
on social media:
https://docs.google.com/presentation/d/1oDbMol3F_XQuCmttPYxBIOIjRuRBksUjDApjd8Ve3L8/edit#slide=id.g26b0919956e_0_13

If you have questions, email the Marketing & Publicity team at
mark...@apache.org.

Peace,
BKP

[1] https://apache.org/asf25years/

[NOTE: You are receiving this message because you are a contributor to an
Apache Software Foundation project. The ASF will very occasionally send out
messages relating to the Foundation to contributors and members, such as
this one.]

Brian Proffitt
VP, Marketing & Publicity
VP, Conferences


Data duplicated with s3 file sink

2024-04-03 Thread Vararu, Vadim
Hi all,

I’ve got a Flink job that uses Kinesis as source and S3 files as Sink. The sink 
rolls at checkpoints and the checkpointing itself is configured as 
EXACTLY_ONCE. While running, everything looks good and a new bunch of files 
appear on s3 each minute (checkpoint is each 60s).

The problem happens when I stop the job with savepoint. The job generates a 
savepoint that contains  Kinesis offsets but new files are uploaded to s3 
containing records that go beyond the offsets from the savepoint.

I doubt that’s the normal behavior because it breaks the exactly once principle.
Has anyone met this kind of behavior?

That’s how the sink is defined:
FileSink.forBulkFormat (new Path (basePath), new ParquetWriterFactory<> 
(parquetBuilder))
.withBucketAssigner (new DateTimeBucketAssigner<> (dateTimeFormat))
.withOutputFileConfig (OutputFileConfig.builder ()
.withPartPrefix (String.format ("part-%s", UUID.randomUUID ()))
.withPartSuffix (String.format ("%s.parquet", 
compressionCodecName.getExtension ()))
.build ())
.build ();

Thanks,
Vararu Vadim.


Re:Execute Python UDF in Java Flink application

2024-04-03 Thread Xuyang
Hi, Tony.
I think it's easy for users to use python udf with java. You can find more 
details here[1][2].


[1] 
https://flink.apache.org/2020/04/09/pyflink-introducing-python-support-for-udfs-in-flinks-table-api/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL







--

Best!
Xuyang




At 2024-04-03 07:56:14, "Zhou, Tony"  wrote:

Hi everyone,

 

Out of curiosity, I have a high level question with Flink: I have a use case 
where I want to define some UDFs in Python while have the main logics written 
in Java. I am wondering how complex it is going to be with this design choice, 
or even if it is possible with Flink.

 

Thanks,

Tony

Re: Debugging Kryo Fallback

2024-04-02 Thread Salva Alcántara
FYI Reposted in SO:
-
https://stackoverflow.com/questions/78265380/how-to-debug-the-kryo-fallback-in-flink

On Thu, Mar 28, 2024 at 7:24 AM Salva Alcántara 
wrote:

> I wonder which is the simplest way of troubleshooting/debugging what
> causes the Kryo fallback.
>
> Detecting it is just a matter of adding this line to your job:
>
> ```
> env.getConfig().disableGenericTypes();
> ```
>
> or in more recent versions:
>
> ```
> pipeline.generic-types: false
>
> ```
>
> But once you detect the issue, what is the simplest way to debug it? You can 
> of course add a breakpoint in:
> org.apache.flink.api.java.typeutils.TypeExtractor@analyzePojo
>
> but ideally there should be a simpler way to show all the problems 
> encountered to the user without having to get that deep into the code.
>
> Thanks in advance,
>
> Salva
>
>


Re: join two streams with pyflink

2024-04-02 Thread Biao Geng
Hi Thierry,

Your case is not very complex and I believe all programming language(e.g.
Java, Python, SQL) interfaces of flink can do that.
When using pyflink, you can use pyflink datastream/table/SQL API.
Here are some examples of using pyflink table api:
https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/basic_operations.html
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/tests/test_join.py

Hope it helps!

Best,
Biao Geng



Fokou Toukam, Thierry  于2024年4月2日周二
15:41写道:

> Hi,
>
> i have 2 streams as sean in this example (*6> {"tripId": "275118740",
> "timestamp": "2024-04-02T06:20:00Z", "stopSequence": 13, "stopId": "61261",
> "bearing": 0.0, "speed": 0.0, "vehicleId": "39006", "routeId": "747"}*
> *1> {"visibility": 1, "weather_conditions": "clear sky", "timestamp":
> "2024-04-02T02:19:39.281768"}*) and  i want to merge them based on
> timestamp value. Which flink can i use or how can i do it?
>
> Thanks!
>
> *Thierry FOKOU *| * IT M.A.Sc  Student*
>
>
>
>


Re: How to handle tuple keys with null values

2024-04-02 Thread Hang Ruan
Hi Sachin.

I think maybe we could cast the Long as String to handle the null value. Or
as Asimansu said, try to filter out the null data.

Best,
Hang

Asimansu Bera  于2024年4月3日周三 08:35写道:

> Hello Sachin,
>
> The same issue had been reported in the past and JIRA was closed without
> resolution.
>
> https://issues.apache.org/jira/browse/FLINK-4823
>
> I do see this is as a data quality issue. You need to understand what you
> would like to do with the null value. Either way, better to filter out the
> null data earlier so that you may not necessary manage the null or you may
> also try using POJO as POJO might support null.
>
> Sincerely,
> -A
>
>
> On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal  wrote:
>
>> Hello folks,
>> I am keying my stream using a Tuple:
>>
>> example:
>>
>> public class MyKeySelector implements KeySelector> {
>>
>> @Override
>> public Tuple2 getKey(Data data) {
>>   return Tuple2.of(data.id, data.id1);
>> }
>>
>> }
>>
>> Now id1 can have null values. In this case how should I handle this?
>>
>> Right now I am getting this error:
>>
>> java.lang.RuntimeException: Exception occurred while setting the current key 
>> context.
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
>> expected to hold a value.
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>>  

Execute Python UDF in Java Flink application

2024-04-02 Thread Zhou, Tony
Hi everyone,

Out of curiosity, I have a high level question with Flink: I have a use case 
where I want to define some UDFs in Python while have the main logics written 
in Java. I am wondering how complex it is going to be with this design choice, 
or even if it is possible with Flink.

Thanks,
Tony


Re: GCS FileSink Read Timeouts

2024-04-02 Thread Asimansu Bera
Hello Dylan,

I'm not an expert.

There are many configuration settings(tuning) which could be setup via
flink configuration. Pls refer to the second link below - specifically
retry options.

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/filesystems/gcs/
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.18/gcs/CONFIGURATION.md

Thanks-
-A

On Tue, Apr 2, 2024 at 1:13 PM Dylan Fontana via user 
wrote:

> Hey Flink Users,
>
> We've been facing an issue with GCS that I'm keen to hear the community's
> thoughts or insights on.
>
> We're using the GCS FileSystem on a FileSink to write parquets in our
> Flink app. We're finding sporadic instances of
> `com.google.cloud.storage.StorageException: Read timed out` that cause our
> job to restart. While we have tolerance in place for failed checkpoints,
> this causes many more failures/restarts as compared to other FileSystems
> like AWS or Azure we use. We've tried tuning the size of the files we write
> but found no improvement; our parquets are already "tiny" - _many_ parquets
> on the order of 1-10KB. Following multiple stack traces, we see the
> exception raised from multiple parts of the sink lifecycle:
> FileWriter::prepareCommit, FileWriter::write, and FileCommitter::commit.
>
> Our hypothesis is sporadic failures from GCS HTTP APIs that aren't getting
> retried correctly or need a longer timeout than the default (20 seconds for
> Read timeouts, 50 seconds for Retries overall). This problem is infrequent
> enough that it's hard to reproduce/test; it comes and goes on how noisy it
> is.
>
> I noticed we can't tune any google-cloud-storage parameters via
> flink-config; there's FLINK-32877[1] which proposed adding Read/Connection
> Timeout parameters for the HTTPTransportOptions[2] but it's still open. I
> also noticed there's more we can change like what gets retried in the
> StorageRetryStrategy[3] and the RetrySettings[4]. Ultimately I'm thinking
> of creating an alternate FileSystemFactory in our deployment (under a
> different scheme/plugin) to test how tweaking these options in the
> StorageOptions.Builder[5] call works out.
>
> Have other GCS FileSink users hit these exceptions? What did you do?
> Anything else we might need to consider?
>
> -Dylan
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-32877
> [2]:
> https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions
> [3]:
> https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.StorageRetryStrategy
> [4]:
> https://github.com/googleapis/sdk-platform-java/blob/a94c2f0e8a99f0ddf17106cbc8117cefe6b0e127/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java#L787
> [5]:
> https://github.com/apache/flink/blob/163b9cca6d2ccac0ff89dd985e3232667ddfb14f/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L94
>


Re: How to handle tuple keys with null values

2024-04-02 Thread Asimansu Bera
Hello Sachin,

The same issue had been reported in the past and JIRA was closed without
resolution.

https://issues.apache.org/jira/browse/FLINK-4823

I do see this is as a data quality issue. You need to understand what you
would like to do with the null value. Either way, better to filter out the
null data earlier so that you may not necessary manage the null or you may
also try using POJO as POJO might support null.

Sincerely,
-A


On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal  wrote:

> Hello folks,
> I am keying my stream using a Tuple:
>
> example:
>
> public class MyKeySelector implements KeySelector> {
>
> @Override
> public Tuple2 getKey(Data data) {
>   return Tuple2.of(data.id, data.id1);
> }
>
> }
>
> Now id1 can have null values. In this case how should I handle this?
>
> Right now I am getting this error:
>
> java.lang.RuntimeException: Exception occurred while setting the current key 
> context.
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
> expected to hold a value.
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> ... 18 more
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> 

GCS FileSink Read Timeouts

2024-04-02 Thread Dylan Fontana via user
Hey Flink Users,

We've been facing an issue with GCS that I'm keen to hear the community's
thoughts or insights on.

We're using the GCS FileSystem on a FileSink to write parquets in our Flink
app. We're finding sporadic instances of
`com.google.cloud.storage.StorageException: Read timed out` that cause our
job to restart. While we have tolerance in place for failed checkpoints,
this causes many more failures/restarts as compared to other FileSystems
like AWS or Azure we use. We've tried tuning the size of the files we write
but found no improvement; our parquets are already "tiny" - _many_ parquets
on the order of 1-10KB. Following multiple stack traces, we see the
exception raised from multiple parts of the sink lifecycle:
FileWriter::prepareCommit, FileWriter::write, and FileCommitter::commit.

Our hypothesis is sporadic failures from GCS HTTP APIs that aren't getting
retried correctly or need a longer timeout than the default (20 seconds for
Read timeouts, 50 seconds for Retries overall). This problem is infrequent
enough that it's hard to reproduce/test; it comes and goes on how noisy it
is.

I noticed we can't tune any google-cloud-storage parameters via
flink-config; there's FLINK-32877[1] which proposed adding Read/Connection
Timeout parameters for the HTTPTransportOptions[2] but it's still open. I
also noticed there's more we can change like what gets retried in the
StorageRetryStrategy[3] and the RetrySettings[4]. Ultimately I'm thinking
of creating an alternate FileSystemFactory in our deployment (under a
different scheme/plugin) to test how tweaking these options in the
StorageOptions.Builder[5] call works out.

Have other GCS FileSink users hit these exceptions? What did you do?
Anything else we might need to consider?

-Dylan


[1]: https://issues.apache.org/jira/browse/FLINK-32877
[2]:
https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions
[3]:
https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.StorageRetryStrategy
[4]:
https://github.com/googleapis/sdk-platform-java/blob/a94c2f0e8a99f0ddf17106cbc8117cefe6b0e127/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java#L787
[5]:
https://github.com/apache/flink/blob/163b9cca6d2ccac0ff89dd985e3232667ddfb14f/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L94


Re: Understanding checkpoint/savepoint storage requirements

2024-04-02 Thread Robert Young
Thank you both for the information!

Rob

On Thu, Mar 28, 2024 at 7:08 PM Asimansu Bera 
wrote:

> To add more details to it so that it will be clear why access to
> persistent object stores for all JVM processes are required for a job graph
> of Flink for consistent recovery.
> *JoB Manager:*
>
> Flink's JobManager writes critical metadata during checkpoints for fault
> tolerance:
>
>- Job Configuration: Preserves job settings (parallelism, state
>backend) for consistent restarts.
>- Progress Information: Stores offsets (source/sink positions) to
>resume processing from the correct point after failures.
>- Checkpoint Counters: Provides metrics (ID, timestamp, duration) for
>monitoring checkpointing behavior.
>
>
> *Task Managers:*
> While the JobManager handles checkpoint metadata, TaskManagers are the
> workhorses during Flink checkpoints. Here's what they do:
>
>- State Snapshots: Upon receiving checkpoint instructions,
>TaskManagers capture snapshots of their current state. This state includes
>in-memory data and operator variables crucial for resuming processing.
>- State Serialization: The captured state is transformed into a format
>suitable for storage, often byte arrays. This serialized data represents
>the actual application state.
>
>
> A good network connection bandwidth is very crucial to write the large
> state quicker to HDFS/S3 object store from all operator states so that Task
> Manager could write it quickly. Oftentimes, some customers use NFS as a
> persistent store which is not recommended as NFS is slow and slows down the
> checkpointing.
>
> -A
>
>
> On Wed, Mar 27, 2024 at 7:52 PM Feifan Wang  wrote:
>
>> Hi Robert :
>>
>> Your understanding are right !
>> Add some more information : JobManager not only responsible for cleaning
>> old checkpoints, but also needs to write metadata file to checkpoint
>> storage after all taskmanagers have taken snapshots.
>>
>> ---
>> Best
>> Feifan Wang
>>
>> At 2024-03-28 06:30:54, "Robert Young"  wrote:
>>
>> Hi all, I have some questions about checkpoint and savepoint storage.
>>
>> From what I understand a distributed, production-quality job with a lot
>> of state should use durable shared storage for checkpoints and savepoints.
>> All job managers and task managers should access the same volume. So
>> typically you'd use hadoop, S3, Azure etc.
>>
>> In the docs [1] it states for state.checkpoints.dir: "The storage path
>> must be accessible from all participating processes/nodes(i.e. all
>> TaskManagers and JobManagers)."
>>
>> I want to understand why that is exactly. Here's my understanding:
>>
>> 1. The JobManager is responsible for cleaning old checkpoints, so it
>> needs access to all the files written out by all the task managers so it
>> can remove them.
>> 2. For recovery/rescaling if all nodes share the same volume then
>> TaskManagers can read/redistribute the checkpoint data easily, since the
>> volume is shared.
>>
>> Is that correct? Are there more aspects to why the directory must be
>> shared across the processes?
>>
>> Thank you,
>> Rob Young
>>
>> 1.
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/#related-config-options
>>
>>


How to handle tuple keys with null values

2024-04-02 Thread Sachin Mittal
Hello folks,
I am keying my stream using a Tuple:

example:

public class MyKeySelector implements KeySelector> {

@Override
public Tuple2 getKey(Data data) {
  return Tuple2.of(data.id, data.id1);
}

}

Now id1 can have null values. In this case how should I handle this?

Right now I am getting this error:

java.lang.RuntimeException: Exception occurred while setting the
current key context.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.flink.types.NullFieldException: Field 1 is null,
but expected to hold a value.
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
~[flink-dist-1.17.1.jar:1.17.1]
... 18 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]


Thanks
Sachin


join two streams with pyflink

2024-04-02 Thread Fokou Toukam, Thierry
Hi,

i have 2 streams as sean in this example (6> {"tripId": "275118740", 
"timestamp": "2024-04-02T06:20:00Z", "stopSequence": 13, "stopId": "61261", 
"bearing": 0.0, "speed": 0.0, "vehicleId": "39006", "routeId": "747"}
1> {"visibility": 1, "weather_conditions": "clear sky", "timestamp": 
"2024-04-02T02:19:39.281768"}) and  i want to merge them based on timestamp 
value. Which flink can i use or how can i do it?

Thanks!


Thierry FOKOU |  IT M.A.Sc Student




Re: 配置hadoop依赖问题

2024-04-01 Thread Biao Geng
Hi fengqi,

“Hadoop is not in the
classpath/dependencies.”报错说明org.apache.hadoop.conf.Configuration和org.apache.hadoop.fs.FileSystem这些hdfs所需的类没有找到。

如果你的系统环境中有hadoop的话,通常是用这种方式来设置classpath:
export HADOOP_CLASSPATH=`hadoop classpath`

如果你的提交方式是提交到本地一个standalone的flink集群的话,可以检查下flink生成的日志文件,里面会打印classpath,可以看下是否有Hadoop相关的class。

Best,
Biao Geng


ha.fen...@aisino.com  于2024年4月2日周二 10:24写道:

> 1、在开发环境下,添加的有hadoop-client依赖,checkpoint时可以访问到hdfs的路径
> 2、flink1.19.0,hadoop3.3.1,jar提交到单机flink系统中,提示如下错误
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobInitializationException: Could not start
> the JobMaster.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint
> storage at checkpoint coordinator side.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
> ... 3 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
> checkpoint storage at checkpoint coordinator side.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:364)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:273)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:503)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:334)
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:173)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:381)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:162)
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 3 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> at
> 

Re: Handle exception for an empty Datastream

2024-04-01 Thread Biao Geng
Hi Nida,

The StreamExecutionEnvironment#fromCollection(java.util.Collection
data) method will check if the input collection is empty and throw the
exception you have met if it is.
The 'list != null' cannot get rid of the exception but the '
!list.isEmpty() '  should do the trick.
Could you please show your codes with the ' !list.isEmpty() ' check?

Best,
Biao Geng

Fidea Lidea  于2024年4月2日周二 02:06写道:

> Hi Team,
>
> I have written a Flink Job which reads data in a List & then converts it
> to stream.
> *Example*:
>
>  public static void main(String[] args) throws Exception {
>   // set up execution environment
>   StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> . . .
>  List list=processData( );
>   DataStream orderA = env.fromCollection(list);
>   . . . .
>   env.execute("Job");
>   }
>
> If *list* is empty then code throws below exception on Flink UI & does
> not execute the Job. How can I handle this?
> I've tried using list != null or list ! = isEmpty() checks. But it
> didn't work. Request to provide suggestions.
>
> *Exception : *
>
> [image: image.png]
>
> Thanks  & Regards
> Nida
>
>


Re: 退订

2024-04-01 Thread Biao Geng
Hi,

退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.


Best,
Biao Geng

CloudFunny  于2024年3月31日周日 22:25写道:

>
>


Re: 退订

2024-04-01 Thread Biao Geng
Hi,

退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.


Best,
Biao Geng

戴少  于2024年4月1日周一 11:09写道:

> 退订
>
> --
>
> Best Regards,
>
>
>
>
>  回复的原邮件 
> | 发件人 | wangfengyang |
> | 发送日期 | 2024年03月22日 17:28 |
> | 收件人 | user-zh  |
> | 主题 | 退订 |
> 退订


Re: 退订

2024-04-01 Thread Biao Geng
Hi,

退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.


Best,
Biao Geng


杨东树  于2024年3月31日周日 20:23写道:

> 申请退订邮件通知,谢谢!


Re: 申请退订邮件申请,谢谢

2024-04-01 Thread Biao Geng
Hi,

退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.


Best,
Biao Geng

 于2024年3月31日周日 22:20写道:

> 申请退订邮件申请,谢谢


Handle exception for an empty Datastream

2024-04-01 Thread Fidea Lidea
Hi Team,

I have written a Flink Job which reads data in a List & then converts it to
stream.
*Example*:

 public static void main(String[] args) throws Exception {
  // set up execution environment
  StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
. . .
 List list=processData( );
  DataStream orderA = env.fromCollection(list);
  . . . .
  env.execute("Job");
  }

If *list* is empty then code throws below exception on Flink UI & does not
execute the Job. How can I handle this?
I've tried using list != null or list ! = isEmpty() checks. But it
didn't work. Request to provide suggestions.

*Exception : *

[image: image.png]

Thanks  & Regards
Nida


退订

2024-04-01 Thread 薛礼彬
退订

Re: Flink pipeline throughput

2024-04-01 Thread Asimansu Bera
Hello Kartik,

For your case, if events ingested/Second is 300/60=5 and payload size is
2kb , per second, ingestion size 5*2k=10kb. Network buffer size is 32kb by
default. You can also decrease the value to 16k.

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#taskmanager-memory-segment-size

You can also read:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/network_mem_tuning/

-A

On Mon, Apr 1, 2024 at 6:05 AM Kartik Kushwaha 
wrote:

> Thank you. I will check and get back on both the sugesstions made by
> Asimansu and Xuyang.
>
> I am using Flink 1.17.0
>
> Regards,
> Kartik
>
> On Mon, Apr 1, 2024, 5:13 AM Asimansu Bera 
> wrote:
>
>> Hello Karthik,
>>
>> You may check the execution-buffer-timeout-interval parameter. This value
>> is an important one for your case. I had a similar issue experienced in the
>> past.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-buffer-timeout-interval
>>
>> For your case, not sure about your version of Flink, default value is
>> 100ms - means every operator will have 100ms wait time to fill up the
>> network buffer before flushing out the network buffer for data exchange to
>> downstream operators.
>>
>> You can change this parameter to 0 ( or few ms ) so that data would be
>> flushed for every (few) record(s). Then share your findings.
>>
>>
>> -A
>>
>>
>>
>>
>>
>>
>> On Sun, Mar 31, 2024 at 9:47 PM Xuyang  wrote:
>>
>>> Hi, Kartik.
>>> On flink ui, is there any operator that maintains a relatively high
>>> *busy*? Could you also try using a flame graph to provide more
>>> information?[1]
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/
>>>
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> At 2024-03-30 18:07:23, "Kartik Kushwaha" 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I have a Streaming event processing job that looks like this.
>>>
>>> *Source - ProcessFn(3 in total) - Sink*
>>>
>>> I am seeing a delay of 50ms to 250ms between each operators (maybe
>>> buffering or serde delays) leading to a slow end- to-end processing. What
>>> could be the reason for such high latency?
>>>
>>> Some more details:
>>> - Source operator is getting continuous events at a rate of 200 to 300
>>> events per minute through controlled tests.
>>> - Using DataStream between the operators. It has simple types and
>>> the input payload got from source in byte[] format as fields. Right now the
>>> size of the payload is in few kb's.
>>> - Same events are also processed by another Flink job that looks 
>>> *source-processFn(mongoWriter)-sink.
>>> *Here the end-to-end processing is less than 5ms. Similar Stream
>>> is being carried.
>>> - The original(problematic) pipeline, has extraction, validation,
>>> transformation processFn. But each of these steps get completed within
>>> couple of ms. I am calculating the processing time inside these process
>>> functions by *endTime - startTime* logic in the java code. So the
>>> Average time of the event inside the operators is just 1ms.
>>> - There is no back pressure shown in the flink ui on these operators.
>>> - Input events are continously flowing from the source at  a very high
>>> rate without any delays. So waiting on the buffer can be ruled out.
>>>
>>> Regards,
>>> Kartik
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: Flink pipeline throughput

2024-04-01 Thread Kartik Kushwaha
Thank you. I will check and get back on both the sugesstions made by
Asimansu and Xuyang.

I am using Flink 1.17.0

Regards,
Kartik

On Mon, Apr 1, 2024, 5:13 AM Asimansu Bera  wrote:

> Hello Karthik,
>
> You may check the execution-buffer-timeout-interval parameter. This value
> is an important one for your case. I had a similar issue experienced in the
> past.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-buffer-timeout-interval
>
> For your case, not sure about your version of Flink, default value is
> 100ms - means every operator will have 100ms wait time to fill up the
> network buffer before flushing out the network buffer for data exchange to
> downstream operators.
>
> You can change this parameter to 0 ( or few ms ) so that data would be
> flushed for every (few) record(s). Then share your findings.
>
>
> -A
>
>
>
>
>
>
> On Sun, Mar 31, 2024 at 9:47 PM Xuyang  wrote:
>
>> Hi, Kartik.
>> On flink ui, is there any operator that maintains a relatively high
>> *busy*? Could you also try using a flame graph to provide more
>> information?[1]
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-03-30 18:07:23, "Kartik Kushwaha" 
>> wrote:
>>
>> Hello,
>>
>> I have a Streaming event processing job that looks like this.
>>
>> *Source - ProcessFn(3 in total) - Sink*
>>
>> I am seeing a delay of 50ms to 250ms between each operators (maybe
>> buffering or serde delays) leading to a slow end- to-end processing. What
>> could be the reason for such high latency?
>>
>> Some more details:
>> - Source operator is getting continuous events at a rate of 200 to 300
>> events per minute through controlled tests.
>> - Using DataStream between the operators. It has simple types and
>> the input payload got from source in byte[] format as fields. Right now the
>> size of the payload is in few kb's.
>> - Same events are also processed by another Flink job that looks 
>> *source-processFn(mongoWriter)-sink.
>> *Here the end-to-end processing is less than 5ms. Similar Stream
>> is being carried.
>> - The original(problematic) pipeline, has extraction, validation,
>> transformation processFn. But each of these steps get completed within
>> couple of ms. I am calculating the processing time inside these process
>> functions by *endTime - startTime* logic in the java code. So the
>> Average time of the event inside the operators is just 1ms.
>> - There is no back pressure shown in the flink ui on these operators.
>> - Input events are continously flowing from the source at  a very high
>> rate without any delays. So waiting on the buffer can be ruled out.
>>
>> Regards,
>> Kartik
>>
>>
>>
>>
>>
>>
>>


Re: Re: Optimize exact deduplication for tens of billions data per day

2024-04-01 Thread Jeyhun Karimov
Hi Lei,

In addition to the valuable suggested options above, maybe you can try to
optimize your partitioning function (since you know your data).
Maybe sample [subset of] your data if possible and/or check the key
distribution, before re-defining your partitioning function.

Regards,
Jeyhun

On Mon, Apr 1, 2024 at 4:00 AM Xuyang  wrote:

> Hi, Wang.
>
> What about just increasing the parallemism to reduce the number of keys
> processed per parallelism? Is the distribution
>  of keys uneven? If so, you can use the datastream api to manually
> implement some optimization methods of flink sql.[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-29 21:49:42,"Lei Wang"  写道:
>
> Perhaps I can  keyBy(Hash(originalKey) % 10)
> Then in the KeyProcessOperator using MapState instead of ValueState
>   MapState  mapState
>
> There's about  10 OriginalKey for each mapState
>
> Hope this will help
>
> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry 
> wrote:
>
>> Hi Lei,
>>
>> Have you tried to make the key smaller, and store a list of found keys as
>> a value?
>>
>> Let's make the operator key a hash of your original key, and store a list
>> of the full keys in the state. You can play with your hash length to
>> achieve the optimal number of keys.
>>
>> I hope this helps,
>> Peter
>>
>> On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:
>>
>>>
>>> Use RocksDBBackend to store whether the element appeared within the last
>>> one day,  here is the code:
>>>
>>> *public class DedupFunction extends KeyedProcessFunction
>>> {*
>>>
>>> *private ValueState isExist;*
>>>
>>> *public void open(Configuration parameters) throws Exception {*
>>> *ValueStateDescriptor desc = new *
>>> *StateTtlConfig ttlConfig =
>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
>>> *desc.enableTimeToLive(ttlConfig);*
>>> *isExist = getRuntimeContext().getState(desc);*
>>> *}*
>>>
>>> *public void processElement(IN in,  ) {*
>>> *if(null == isExist.value()) {*
>>> *out.collect(in)*
>>> *isExist.update(true)*
>>> *} *
>>> *}*
>>> *}*
>>>
>>> Because the number of distinct key is too large(about 10 billion one day
>>> ), there's performance bottleneck for this operator.
>>> How can I optimize the performance?
>>>
>>> Thanks,
>>> Lei
>>>
>>>
>>


Re: Flink pipeline throughput

2024-04-01 Thread Asimansu Bera
Hello Karthik,

You may check the execution-buffer-timeout-interval parameter. This value
is an important one for your case. I had a similar issue experienced in the
past.

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-buffer-timeout-interval

For your case, not sure about your version of Flink, default value is 100ms
- means every operator will have 100ms wait time to fill up the network
buffer before flushing out the network buffer for data exchange to
downstream operators.

You can change this parameter to 0 ( or few ms ) so that data would be
flushed for every (few) record(s). Then share your findings.


-A






On Sun, Mar 31, 2024 at 9:47 PM Xuyang  wrote:

> Hi, Kartik.
> On flink ui, is there any operator that maintains a relatively high *busy*?
> Could you also try using a flame graph to provide more information?[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/
>
>
> --
> Best!
> Xuyang
>
>
> At 2024-03-30 18:07:23, "Kartik Kushwaha" 
> wrote:
>
> Hello,
>
> I have a Streaming event processing job that looks like this.
>
> *Source - ProcessFn(3 in total) - Sink*
>
> I am seeing a delay of 50ms to 250ms between each operators (maybe
> buffering or serde delays) leading to a slow end- to-end processing. What
> could be the reason for such high latency?
>
> Some more details:
> - Source operator is getting continuous events at a rate of 200 to 300
> events per minute through controlled tests.
> - Using DataStream between the operators. It has simple types and
> the input payload got from source in byte[] format as fields. Right now the
> size of the payload is in few kb's.
> - Same events are also processed by another Flink job that looks 
> *source-processFn(mongoWriter)-sink.
> *Here the end-to-end processing is less than 5ms. Similar Stream is
> being carried.
> - The original(problematic) pipeline, has extraction, validation,
> transformation processFn. But each of these steps get completed within
> couple of ms. I am calculating the processing time inside these process
> functions by *endTime - startTime* logic in the java code. So the Average
> time of the event inside the operators is just 1ms.
> - There is no back pressure shown in the flink ui on these operators.
> - Input events are continously flowing from the source at  a very high
> rate without any delays. So waiting on the buffer can be ruled out.
>
> Regards,
> Kartik
>
>
>
>
>
>
>


Re: 回复:退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 戴少 
Sent: Monday, April 1, 2024 11:10
To: user-zh 
Cc: user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com 
;
 user-zh-subscribe ; user-zh 

Subject: 回复:退订

退订

--

Best Regards,




 回复的原邮件 
| 发件人 | 李一飞 |
| 发送日期 | 2024年03月14日 00:09 |
| 收件人 | 
user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com,
user-zh-subscribe ,
user-zh  |
| 主题 | 退订 |
退订




Re: 退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: zjw 
Sent: Monday, April 1, 2024 11:05
To: user-zh@flink.apache.org 
Subject: 退订




Re: Re:Re: Re: 1.19自定义数据源

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 熊柱 <18428358...@163.com>
Sent: Monday, April 1, 2024 11:14
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: 1.19自定义数据源

退订

















在 2024-03-28 19:56:06,"Zhanghao Chen"  写道:
>如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/
>
>Best,
>Zhanghao Chen
>
>From: ha.fen...@aisino.com 
>Sent: Thursday, March 28, 2024 15:34
>To: user-zh 
>Subject: Re: Re: 1.19自定义数据源
>
>我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
>
>发件人: gongzhongqiang
>发送时间: 2024-03-28 15:05
>收件人: user-zh
>主题: Re: 1.19自定义数据源
>你好:
>
>当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
>版本考虑,可以将这些SourceFunction用Source重新实现。
>
>ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:
>
>>
>> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>>


退订

2024-03-31 Thread 杨作青
退订



Re:Re: Re: 1.19自定义数据源

2024-03-31 Thread 熊柱
退订

















在 2024-03-28 19:56:06,"Zhanghao Chen"  写道:
>如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/
>
>Best,
>Zhanghao Chen
>
>From: ha.fen...@aisino.com 
>Sent: Thursday, March 28, 2024 15:34
>To: user-zh 
>Subject: Re: Re: 1.19自定义数据源
>
>我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
>
>发件人: gongzhongqiang
>发送时间: 2024-03-28 15:05
>收件人: user-zh
>主题: Re: 1.19自定义数据源
>你好:
>
>当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
>版本考虑,可以将这些SourceFunction用Source重新实现。
>
>ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:
>
>>
>> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>>


回复:退订

2024-03-31 Thread 戴少
退订

--

Best Regards,
 


 
 回复的原邮件 
| 发件人 | 李一飞 |
| 发送日期 | 2024年03月14日 00:09 |
| 收件人 | 
user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com,
user-zh-subscribe ,
user-zh  |
| 主题 | 退订 |
退订




??????????

2024-03-31 Thread ????


--

Best Regards,
 


 
  
| ?? | wangfengyang |
|  | 2024??03??22?? 17:28 |
| ?? | user-zh  |
|  |  |


退订

2024-03-31 Thread zjw



Re:Re: Optimize exact deduplication for tens of billions data per day

2024-03-31 Thread Xuyang
Hi, Wang.


What about just increasing the parallemism to reduce the number of keys 
processed per parallelism? Is the distribution
 of keys uneven? If so, you can use the datastream api to manually implement 
some optimization methods of flink sql.[1]


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation




--

Best!
Xuyang




在 2024-03-29 21:49:42,"Lei Wang"  写道:

Perhaps I can  keyBy(Hash(originalKey) % 10)
Then in the KeyProcessOperator using MapState instead of ValueState
  MapState  mapState


There's about  10 OriginalKey for each mapState 


Hope this will help


On Fri, Mar 29, 2024 at 9:24 PM Péter Váry  wrote:

Hi Lei,


Have you tried to make the key smaller, and store a list of found keys as a 
value?


Let's make the operator key a hash of your original key, and store a list of 
the full keys in the state. You can play with your hash length to achieve the 
optimal number of keys.


I hope this helps,
Peter


On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:



Use RocksDBBackend to store whether the element appeared within the last one 
day,  here is the code:


public class DedupFunction extends KeyedProcessFunction  {


private ValueState isExist;


public void open(Configuration parameters) throws Exception {
ValueStateDescriptor desc = new 
StateTtlConfig ttlConfig = 
StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..
desc.enableTimeToLive(ttlConfig);
isExist = getRuntimeContext().getState(desc);
}


public void processElement(IN in,  ) {
if(null == isExist.value()) {
out.collect(in)
isExist.update(true)
} 
}
}


Because the number of distinct key is too large(about 10 billion one day ), 
there's performance bottleneck for this operator.
How can I optimize the performance?


Thanks,
Lei
 


Re:Flink pipeline throughput

2024-03-31 Thread Xuyang
Hi, Kartik.
On flink ui, is there any operator that maintains a relatively high busy? Could 
you also try using a flame graph to provide more information?[1]


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/




--

Best!
Xuyang




At 2024-03-30 18:07:23, "Kartik Kushwaha"  wrote:

Hello,


I have a Streaming event processing job that looks like this.


Source - ProcessFn(3 in total) - Sink


I am seeing a delay of 50ms to 250ms between each operators (maybe buffering or 
serde delays) leading to a slow end- to-end processing. What could be the 
reason for such high latency?


Some more details:
- Source operator is getting continuous events at a rate of 200 to 300 events 
per minute through controlled tests.
- Using DataStream between the operators. It has simple types and the 
input payload got from source in byte[] format as fields. Right now the size of 
the payload is in few kb's. 
- Same events are also processed by another Flink job that looks 
source-processFn(mongoWriter)-sink. Here the end-to-end processing is less than 
5ms. Similar Stream is being carried.
- The original(problematic) pipeline, has extraction, validation,  
transformation processFn. But each of these steps get completed within couple 
of ms. I am calculating the processing time inside these process functions by 
endTime - startTime logic in the java code. So the Average time of the event 
inside the operators is just 1ms. 
- There is no back pressure shown in the flink ui on these operators. 
- Input events are continously flowing from the source at  a very high rate 
without any delays. So waiting on the buffer can be ruled out.


Regards, 
Kartik













退订

2024-03-31 Thread CloudFunny



申请退订邮件申请,谢谢

2024-03-31 Thread wangwj03
申请退订邮件申请,谢谢

退订

2024-03-31 Thread 杨东树
申请退订邮件通知,谢谢!

Re: flink version stable

2024-03-30 Thread Lincoln Lee
Hi Thierry,

The flink connectors have been separated from the main flink repository[1],
using separate repositories and release process[2].
For example, https://github.com/apache/flink-connector-kafka for the Kafka
connector, and its latest release is v3.1.0[3].
You can follow new releases of specific connectors on the mailing list.

[1] https://lists.apache.org/thread/7qr8jc053y8xpygcwbhlqq4r7c7fj1p3
[2]
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
[3] https://lists.apache.org/thread/sz7f4o1orh96zgjjztcp5gh85l3ks26x


Best,
Lincoln Lee


Fokou Toukam, Thierry  于2024年3月29日周五
21:41写道:

> I’m asking because I am seeing that the latest version don’t have all
> libraries such as Kafka connector
>
> *Thierry FOKOU *| * IT M.A.Sc  student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
> [image: image001] 
> --
> *De :* Junrui Lee 
> *Envoyé :* Friday, March 29, 2024 5:11:03 AM
> *À :* user 
> *Objet :* Re: flink version stable
>
> Hi,
>
> The latest stable version of FLINK is 1.19.0
>
>
>
>
> Fokou Toukam, Thierry 
> 于2024年3月29日周五 16:25写道:
>
> Hi, just want to know which version of flink is stable?
>
> *Thierry FOKOU *| * IT M.A.Sc  Student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
> Tél +1 (438) 336-9007
>
> [image: image001] 
>
>


Flink pipeline throughput

2024-03-30 Thread Kartik Kushwaha
Hello,

I have a Streaming event processing job that looks like this.

*Source - ProcessFn(3 in total) - Sink*

I am seeing a delay of 50ms to 250ms between each operators (maybe
buffering or serde delays) leading to a slow end- to-end processing. What
could be the reason for such high latency?

Some more details:
- Source operator is getting continuous events at a rate of 200 to 300
events per minute through controlled tests.
- Using DataStream between the operators. It has simple types and the
input payload got from source in byte[] format as fields. Right now the
size of the payload is in few kb's.
- Same events are also processed by another Flink job that looks
*source-processFn(mongoWriter)-sink.
*Here the end-to-end processing is less than 5ms. Similar Stream is
being carried.
- The original(problematic) pipeline, has extraction, validation,
transformation processFn. But each of these steps get completed within
couple of ms. I am calculating the processing time inside these process
functions by *endTime - startTime* logic in the java code. So the Average
time of the event inside the operators is just 1ms.
- There is no back pressure shown in the flink ui on these operators.
- Input events are continously flowing from the source at  a very high rate
without any delays. So waiting on the buffer can be ruled out.

Regards,
Kartik


Re: Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Lei Wang
Perhaps I can  keyBy(Hash(originalKey) % 10)
Then in the KeyProcessOperator using MapState instead of ValueState
  MapState  mapState

There's about  10 OriginalKey for each mapState

Hope this will help

On Fri, Mar 29, 2024 at 9:24 PM Péter Váry 
wrote:

> Hi Lei,
>
> Have you tried to make the key smaller, and store a list of found keys as
> a value?
>
> Let's make the operator key a hash of your original key, and store a list
> of the full keys in the state. You can play with your hash length to
> achieve the optimal number of keys.
>
> I hope this helps,
> Peter
>
> On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:
>
>>
>> Use RocksDBBackend to store whether the element appeared within the last
>> one day,  here is the code:
>>
>> *public class DedupFunction extends KeyedProcessFunction  {*
>>
>> *private ValueState isExist;*
>>
>> *public void open(Configuration parameters) throws Exception {*
>> *ValueStateDescriptor desc = new *
>> *StateTtlConfig ttlConfig =
>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
>> *desc.enableTimeToLive(ttlConfig);*
>> *isExist = getRuntimeContext().getState(desc);*
>> *}*
>>
>> *public void processElement(IN in,  ) {*
>> *if(null == isExist.value()) {*
>> *out.collect(in)*
>> *isExist.update(true)*
>> *} *
>> *}*
>> *}*
>>
>> Because the number of distinct key is too large(about 10 billion one day
>> ), there's performance bottleneck for this operator.
>> How can I optimize the performance?
>>
>> Thanks,
>> Lei
>>
>>
>


Re: flink version stable

2024-03-29 Thread Fokou Toukam, Thierry
I’m asking because I am seeing that the latest version don’t have all libraries 
such as Kafka connector


Thierry FOKOU |  IT M.A.Sc student

Département de génie logiciel et TI

École de technologie supérieure  |  Université du Québec

1100, rue Notre-Dame Ouest

Montréal (Québec)  H3C 1K3

[image001]


De : Junrui Lee 
Envoyé : Friday, March 29, 2024 5:11:03 AM
À : user 
Objet : Re: flink version stable

Hi,

The latest stable version of FLINK is 1.19.0




Fokou Toukam, Thierry 
mailto:thierry.fokou-touka...@ens.etsmtl.ca>>
 于2024年3月29日周五 16:25写道:
Hi, just want to know which version of flink is stable?


Thierry FOKOU |  IT M.A.Sc Student

Département de génie logiciel et TI

École de technologie supérieure  |  Université du Québec

1100, rue Notre-Dame Ouest

Montréal (Québec)  H3C 1K3

Tél +1 (438) 336-9007

[image001]


Re: Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Péter Váry
Hi Lei,

Have you tried to make the key smaller, and store a list of found keys as a
value?

Let's make the operator key a hash of your original key, and store a list
of the full keys in the state. You can play with your hash length to
achieve the optimal number of keys.

I hope this helps,
Peter

On Fri, Mar 29, 2024, 09:08 Lei Wang  wrote:

>
> Use RocksDBBackend to store whether the element appeared within the last
> one day,  here is the code:
>
> *public class DedupFunction extends KeyedProcessFunction  {*
>
> *private ValueState isExist;*
>
> *public void open(Configuration parameters) throws Exception {*
> *ValueStateDescriptor desc = new *
> *StateTtlConfig ttlConfig =
> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
> *desc.enableTimeToLive(ttlConfig);*
> *isExist = getRuntimeContext().getState(desc);*
> *}*
>
> *public void processElement(IN in,  ) {*
> *if(null == isExist.value()) {*
> *out.collect(in)*
> *isExist.update(true)*
> *} *
> *}*
> *}*
>
> Because the number of distinct key is too large(about 10 billion one day
> ), there's performance bottleneck for this operator.
> How can I optimize the performance?
>
> Thanks,
> Lei
>
>


Re: flink version stable

2024-03-29 Thread Junrui Lee
Hi,

The latest stable version of FLINK is 1.19.0



>
> Fokou Toukam, Thierry 
> 于2024年3月29日周五 16:25写道:
>
>> Hi, just want to know which version of flink is stable?
>>
>> *Thierry FOKOU *| * IT M.A.Sc  Student*
>>
>> Département de génie logiciel et TI
>>
>> École de technologie supérieure  |  Université du Québec
>>
>> 1100, rue Notre-Dame Ouest
>>
>> Montréal (Québec)  H3C 1K3
>>
>> Tél +1 (438) 336-9007
>>
>> [image: image001] 
>>
>


Re: Batch Job with Adaptive Batch Scheduler failing with JobInitializationException: Could not start the JobMaster

2024-03-29 Thread Junrui Lee
Hi Dipak,

Regarding question 1, I noticed from the logs that the method
createBatchExecutionEnvironment from Beam is being used in your job. IIUC,
this method utilizes Flink's DataSet API. If indeed the DataSet API is
being used, the configuration option execution.batch-shuffle-mode will not
take effect, and you should set the ExecutionMode to BATCH_FORCED, in Beam
you can configure executionModeForBatch to BATCH_FORCED.
And this limitation has been documented in the latest Flink version 1.19
documentation:
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/elastic_scaling/#limitations-2
.

Best,
Junrui

Dipak Tandel  于2024年3月29日周五 16:05写道:

> Hi Everyone
>
> I am facing some issues while running the batch job on a Flink cluster
> using Adaptive Batch Scheduler. I have deployed a flink cluster on
> Kubernetes using the flink Kubernetes operator and submitted a job to the
> cluster using Apache beam FlinkRunner.  I am using Flink version 1.16.   I
> want to figure out two things.
>
>
>
> *1.* I am trying to run one batch job with an adaptive batch scheduler.
> I referred to the elastic scaling document [1] and based on the document I
> have added the below config to my FlinkDeployment. But the job is failing
> with JobInitializationException: Could not start the JobMaster. I have
> attached the job manager log below. *Can someone explain why the job is
> failing despite using the recommended configuration?*
>
> jobmanager.scheduler: AdaptiveBatch
> parallelism.default: "-1"
> taskmanager.network.memory.buffers-per-channel: "0"
> execution.batch-shuffle-mode: "ALL_EXCHANGES_BLOCKING"
>
>
>
> *2. *The document has listed some limitations of elastic scaling. One of
> them is about the input file format. *My code is written in Apache Beam,
> how to check, how the input is being read in Flink?*
>
>
>- FileInputFormat sources are not supported: FileInputFormat sources
>are not supported, including StreamExecutionEnvironment#readFile(...)
>StreamExecutionEnvironment#readTextFile(...) and 
> StreamExecutionEnvironment#createInput(FileInputFormat,
>...). Users should use the new sources(FileSystem DataStream Connector
>
> 
> or FileSystem SQL Connector
>
> )
>to read files when using the Adaptive Batch Scheduler.
>
>
> Let me know if additional information is needed to debug the issue.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
>
> Regards
> Dipak
>
>


Row to tuple conversion in PyFlink when switching to 'thread' execution mode

2024-03-29 Thread Wouter Zorgdrager
Dear readers,

I'm running into some unexpected behaviour in PyFlink when switching
execution mode from process to thread. In thread mode, my `Row` gets
converted to a tuple whenever I use a UDF in a map operation. By this
conversion to tuples, we lose critical information such as column names.
Below is a minimal working example (mostly taken from the documentation):

```
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)
```
This results in the following exception:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a' 2024-03-28 16:32:10
at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at .(:1) 2024-03-28 16:32:10 at
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
Note that in process mode this works perfectly fine. Is this expected
behaviour and/or is there a workaround?

Kind regards,
Wouter


Batch Job with Adaptive Batch Scheduler failing with JobInitializationException: Could not start the JobMaster

2024-03-29 Thread Dipak Tandel
Hi Everyone

I am facing some issues while running the batch job on a Flink cluster
using Adaptive Batch Scheduler. I have deployed a flink cluster on
Kubernetes using the flink Kubernetes operator and submitted a job to the
cluster using Apache beam FlinkRunner.  I am using Flink version 1.16.   I
want to figure out two things.



*1.* I am trying to run one batch job with an adaptive batch scheduler.  I
referred to the elastic scaling document [1] and based on the document I
have added the below config to my FlinkDeployment. But the job is failing
with JobInitializationException: Could not start the JobMaster. I have
attached the job manager log below. *Can someone explain why the job is
failing despite using the recommended configuration?*

jobmanager.scheduler: AdaptiveBatch
parallelism.default: "-1"
taskmanager.network.memory.buffers-per-channel: "0"
execution.batch-shuffle-mode: "ALL_EXCHANGES_BLOCKING"



*2. *The document has listed some limitations of elastic scaling. One of
them is about the input file format. *My code is written in Apache Beam,
how to check, how the input is being read in Flink?*


   - FileInputFormat sources are not supported: FileInputFormat sources are
   not supported, including StreamExecutionEnvironment#readFile(...)
   StreamExecutionEnvironment#readTextFile(...) and
StreamExecutionEnvironment#createInput(FileInputFormat,
   ...). Users should use the new sources(FileSystem DataStream Connector
   

or FileSystem SQL Connector
   
)
   to read files when using the Adaptive Batch Scheduler.


Let me know if additional information is needed to debug the issue.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

Regards
Dipak
Enabling required built-in plugins
Linking flink-gs-fs-hadoop-1.16.3.jar to plugin directory
Successfully enabled flink-gs-fs-hadoop-1.16.3.jar
sed: couldn't open temporary file /opt/flink/conf/sedqH57Xz: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedw7wkdz: Read-only file 
system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Starting kubernetes-session as a console application on host 
flink-deployment-fl-7fcf489865-kcrxl.
2024-03-28 16:06:35,858 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 

2024-03-28 16:06:35,863 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  
Preconfiguration:
2024-03-28 16:06:35,864 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -


RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx9034530801 -Xms9034530801 -XX:MaxMetaspaceSize=268435456
dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=1048576015b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=9034530801b -D 
jobmanager.memory.jvm-overhead.max=1048576015b
logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
will impact performance.
INFO  [] - Loading configuration property: blob.server.port, 6124
INFO  [] - Loading configuration property: kubernetes.jobmanager.annotations, 
flinkdeployment.flink.apache.org/generation:8
INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 1
INFO  [] - Loading configuration property: jobmanager.rpc.address, 
flink-deployment-fl.flink
INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 4.0
INFO  [] - Loading configuration property: kubernetes.service-account, flink
INFO  [] - Loading configuration property: kubernetes.cluster-id, 
flink-deployment-fl
INFO  [] - Loading configuration property: 
taskmanager.memory.framework.off-heap.size, 512m
INFO  [] - Loading configuration property: kubernetes.container.image, 
flink:1.16
INFO  [] - Loading configuration property: parallelism.default, -1
INFO  [] - Loading configuration property: kubernetes.namespace, flink
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
INFO  [] - Loading configuration property: 
kubernetes.rest-service.exposed.type, ClusterIP
INFO  [] - Loading configuration property: 
kubernetes.jobmanager.owner.reference, 
uid:d17af556-505a-4084-9ae5-eea79aca86c6,name:flink-deployment-fl,controller:false,blockOwnerDeletion:true,apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment
INFO  [] - Loading configuration property: execution.batch-shuffle-mode, 
ALL_EXCHANGES_BLOCKING
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
8192m
INFO  [] - Loading configuration property: 

Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Lei Wang
Use RocksDBBackend to store whether the element appeared within the last
one day,  here is the code:

*public class DedupFunction extends KeyedProcessFunction  {*

*private ValueState isExist;*

*public void open(Configuration parameters) throws Exception {*
*ValueStateDescriptor desc = new *
*StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType..*
*desc.enableTimeToLive(ttlConfig);*
*isExist = getRuntimeContext().getState(desc);*
*}*

*public void processElement(IN in,  ) {*
*if(null == isExist.value()) {*
*out.collect(in)*
*isExist.update(true)*
*} *
*}*
*}*

Because the number of distinct key is too large(about 10 billion one day ),
there's performance bottleneck for this operator.
How can I optimize the performance?

Thanks,
Lei


Re: One query just for curiosity

2024-03-29 Thread gongzhongqiang
Hi  Ganesh,

As  Zhanghao Chen told before, He advise you two solutions for different
scenarios.

1.Process record is a CPU-bound task: scale up parallelism of task and
flink cluster to improve tps.
2.Process record is a IO-bound task: use Async-IO to reduce cost of
resource and alse get better performance.


Best,

Zhongqiang Gong

Ganesh Walse  于2024年3月29日周五 12:00写道:

> You mean to say we can process 32767 records in parallel. And may I know
> if this is the case then do we need to do anything for this.
>
> On Fri, 29 Mar 2024 at 8:08 AM, Zhanghao Chen 
> wrote:
>
>> Flink can be scaled up to a parallelism of 32767 at max. And if your
>> record processing is mostly IO-bound, you can further boost the throughput
>> via Async-IO [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Ganesh Walse 
>> *Sent:* Friday, March 29, 2024 4:48
>> *To:* user@flink.apache.org 
>> *Subject:* One query just for curiosity
>>
>> Hi Team,
>> If my 1 record gets processed in 1 second in a flink. Then what will be
>> the best time taken to process 1000 records in flink using maximum
>> parallelism.
>>
>


Re: IcebergSourceReader metrics

2024-03-29 Thread Péter Váry
Hi Chetas,
Are you looking for this information?

*  public IcebergSourceReaderMetrics(MetricGroup metrics, String
fullTableName) {*
*MetricGroup readerMetrics =*
*metrics.addGroup("IcebergSourceReader").addGroup("table",
fullTableName);*
*this.assignedSplits = readerMetrics.counter("assignedSplits");*
*this.assignedBytes = readerMetrics.counter("assignedBytes");*
*this.finishedSplits = readerMetrics.counter("finishedSplits");*
*this.finishedBytes = readerMetrics.counter("finishedBytes");*
*this.splitReaderFetchCalls =
readerMetrics.counter("splitReaderFetchCalls");*
*  }*


It could be found here:
https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java#L32-L39

Added here:
https://github.com/apache/iceberg/pull/5554

I hope this helps,
Peter

Chetas Joshi  ezt írta (időpont: 2024. márc. 29.,
P, 2:43):

> Hello,
>
> I am using Flink to read Iceberg (S3). I have enabled all the metrics
> scopes in my FlinkDeployment as below
>
> metrics.scope.jm: flink.jobmanager
> metrics.scope.jm.job: flink.jobmanager.job
> metrics.scope.tm: flink.taskmanager
> metrics.scope.tm.job: flink.taskmanager.job
> metrics.scope.task: flink.task
> metrics.scope.operator: flink.operator
>
>
> I send these metrics to Datadog. I am specifically interested in the
> IcebergSourceReader metrics. I could not find any information about what
> metrics to expect here
> .
> In datadog as well, I could not find any metrics related to the
> IcebergTableSourceReader. Can someone help me understand what metrics
> associated with the IcebergTableSourceReader should be reported and what
> metricGroup (my guess was operator) should they be part of?
>
> Thank you
> Chetas
>


flink version stable

2024-03-28 Thread Fokou Toukam, Thierry
Hi, just want to know which version of flink is stable?


Thierry FOKOU |  IT M.A.Sc Student

Département de génie logiciel et TI

École de technologie supérieure  |  Université du Québec

1100, rue Notre-Dame Ouest

Montréal (Québec)  H3C 1K3

Tél +1 (438) 336-9007

[image001]


Re: Flink cache support

2024-03-28 Thread Marco Villalobos
Zhanghao is correct.  You can use what is called "keyed state".  It's like a 
cache.


https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/state/

> On Mar 28, 2024, at 7:48 PM, Zhanghao Chen  wrote:
> 
> Hi,
> 
> You can maintain a cache manually in your operator implementations. You can 
> load the initial cached data on the operator open() method before the 
> processing starts. However, this would set up a cache per task instance. If 
> you'd like to have a cache shared by all processing tasks without 
> duplication, you might set up a Redis service externally for that purpose.
> 
> Best,
> Zhanghao Chen
> From: Ganesh Walse 
> Sent: Friday, March 29, 2024 4:45
> To: user@flink.apache.org 
> Subject: Flink cache support
>  
> Hi Team,
> 
> In my project my requirement is to cache data from the oracle database where 
> the number of tables are more and the same data will be required for all the 
> transactions to process.
> 
> Can you please suggest the approach where cache should be 1st loaded in 
> memory then stream processing should start.
> 
> Thanks & regards,
> Ganesh Walse.



Re: One query just for curiosity

2024-03-28 Thread Zhanghao Chen
Yes. However, a huge parallelism would require additional coordination cost so 
you might need to set up the JobManager with a decent spec (at least 8C 16G by 
experience). Also, you'll need to make sure there's no external bottlenecks 
(e.g. reading/writing data from the external storage).

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 10:42
To: Zhanghao Chen 
Cc: user@flink.apache.org 
Subject: Re: One query just for curiosity

You mean to say we can process 32767 records in parallel. And may I know if 
this is the case then do we need to do anything for this.

On Fri, 29 Mar 2024 at 8:08 AM, Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Flink can be scaled up to a parallelism of 32767 at max. And if your record 
processing is mostly IO-bound, you can further boost the throughput via 
Async-IO [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

Best,
Zhanghao Chen

From: Ganesh Walse mailto:ganesh.wa...@gmail.com>>
Sent: Friday, March 29, 2024 4:48
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: One query just for curiosity

Hi Team,
If my 1 record gets processed in 1 second in a flink. Then what will be the 
best time taken to process 1000 records in flink using maximum parallelism.


Re: need flink support framework for dependency injection

2024-03-28 Thread Ruibin Xing
Hi Thais,

Thanks, that's really detailed and inspiring! I think we can use the same
pattern for states too.

On Wed, Mar 27, 2024 at 6:40 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ruibin,
>
>
>
> Our code [1] targets a very old version of Flink 1.8, for current
> development my employer didn’t decide (yet?) to contribute it to the public.
>
> That old code does not yet contain the abstractions for setup of state
> primitive, so let me sketch it here:
>
>- Derive a specific implementation per operator from
>SetupDualUnboundedBoundedState
>- All state primitive setup is then implemented in the respective
>open() function
>- Derive the operator and other (savepoint reader/writer) from this
>state setup class/trait
>- For convenience there is a boundedMode field that tells the operator
>whether run in bounded/streaming mode (as the time semantics are similar
>yet different)
>- This is one example where we ‘patched’ the non-public runtime
>implementation (mentioned in that other mail), therefore it needs to be
>maintained Flink version by Flink version 
>
>
>
> Feel free to query details …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> [1] https://github.com/VisecaCard/flink-commons
>
> [2] common savepoint setup:
>
>
>
>
> */** Marker trait for flink functions/operators that can run in both
> Bounded (BATCH) and Unbounded (PIPELINED) mode, * and for auxiliary
> functions for savepoint priming and reading. * * *@note
>
>
> *Derive a specific trait/mixin for each respective flink streaming
> function/operator that initializes *   state primitives. Mixin that
> trait into auxiliary functions for savepoint priming and reading, to have a
> common *   state initialization. * *@note *Call *[[*ch**.**viseca**.*
> *flink**.**operators**.**state**.**SetupDualUnboundedBoundedState*
> *#open(org.apache.flink.api.common.functions.RuntimeContext)*]]
>
>
>
> **   in order to initialize this field. * * */ *trait 
> SetupDualUnboundedBoundedState
> extends Serializable {
>
>
>
> */** Determines at runtime, if the job DAG is running in Bounded (BATCH)
> or Unbounded (PIPELINED) mode.** *@note *Call *[[*ch**.**viseca*
> *.**flink**.**operators**.**state**.**SetupDualUnboundedBoundedState*
> *#open(org.apache.flink.api.common.functions.RuntimeContext)*]]
>
>
> **   in order to initialize this field.* */   *@transient var 
> *boundedMode
> *= false
>
>
> */** Opens the respective function/operator for initialization of state
> primitives */   *def open(rtc: RuntimeContext): Unit = {
> *boundedMode *=
>   rtc match {
> case src: StreamingRuntimeContext =>
>   src.getTaskManagerRuntimeInfo.getConfiguration
> .get[RuntimeExecutionMode](ExecutionOptions.*RUNTIME_MODE*) ==
> RuntimeExecutionMode.
> *BATCH *case _ => false
>   }
>   }
> }
>
>
>
>
>
>
>
>
>
> *From:* Ruibin Xing 
> *Sent:* Wednesday, March 27, 2024 10:41 AM
> *To:* Schwalbe Matthias 
> *Cc:* Marco Villalobos ; Ganesh Walse <
> ganesh.wa...@gmail.com>; user@flink.apache.org
> *Subject:* Re: need flink support framework for dependency injection
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Thias,
>
> Could you share your approach to job setup using Spring, if that's
> possible? We also use Spring Boot for DI in jobs, primarily relying on
> profiles. I'm particularly interested in how you use the same job
> structures for different scenarios, such as reading savepoints. Thank you
> very much.
>
>
>
> On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi Ganesh,
>
> I tend to agree with Marco. However your 'feature request' is very loose
> and leave much room for misunderstanding.
>
> There are at least two scenarios for DI integration:
> - DI for job setup:
>   - we use spring for job setup, which
> - lets us use the same job structure for (at least) 4 scenarios:
> streaming job, batch job for savepoint priming, savepoint reading,
> transformation for complex schema changes -> savepoint writing
> - we also appreciate a very convenient integration of a layered
> configuration by means of spring profiles
> - we can easily replace e.g. sources and sinks for test/local
> develop/debug scenarios
> - however this can also easily be done without DI
> - our approach is public (Apache 2.0 license), if interested
> - DI for Flink would probably be counterproductive for a number of reasons
> (some guesswork here  )
> - from what I see, the Flink code base is separated into two
> clearly distinct parts: the public API, and the non-public implementation
> - Flink community takes great efforts to guarantee backwards
> compatibility of the public API, which also allows for replacing the
> underneath implementation
> - the private API mostly uses the Service-Locator pattern (sort
> of) also to make it 

Re: Flink cache support

2024-03-28 Thread Zhanghao Chen
Hi,

You can maintain a cache manually in your operator implementations. You can 
load the initial cached data on the operator open() method before the 
processing starts. However, this would set up a cache per task instance. If 
you'd like to have a cache shared by all processing tasks without duplication, 
you might set up a Redis service externally for that purpose.

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 4:45
To: user@flink.apache.org 
Subject: Flink cache support

Hi Team,

In my project my requirement is to cache data from the oracle database where 
the number of tables are more and the same data will be required for all the 
transactions to process.

Can you please suggest the approach where cache should be 1st loaded in memory 
then stream processing should start.

Thanks & regards,
Ganesh Walse.




Re: One query just for curiosity

2024-03-28 Thread Ganesh Walse
You mean to say we can process 32767 records in parallel. And may I know if
this is the case then do we need to do anything for this.

On Fri, 29 Mar 2024 at 8:08 AM, Zhanghao Chen 
wrote:

> Flink can be scaled up to a parallelism of 32767 at max. And if your
> record processing is mostly IO-bound, you can further boost the throughput
> via Async-IO [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>
> Best,
> Zhanghao Chen
> --
> *From:* Ganesh Walse 
> *Sent:* Friday, March 29, 2024 4:48
> *To:* user@flink.apache.org 
> *Subject:* One query just for curiosity
>
> Hi Team,
> If my 1 record gets processed in 1 second in a flink. Then what will be
> the best time taken to process 1000 records in flink using maximum
> parallelism.
>


Re: One query just for curiosity

2024-03-28 Thread Zhanghao Chen
Flink can be scaled up to a parallelism of 32767 at max. And if your record 
processing is mostly IO-bound, you can further boost the throughput via 
Async-IO [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 4:48
To: user@flink.apache.org 
Subject: One query just for curiosity

Hi Team,
If my 1 record gets processed in 1 second in a flink. Then what will be the 
best time taken to process 1000 records in flink using maximum parallelism.


IcebergSourceReader metrics

2024-03-28 Thread Chetas Joshi
Hello,

I am using Flink to read Iceberg (S3). I have enabled all the metrics
scopes in my FlinkDeployment as below

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator


I send these metrics to Datadog. I am specifically interested in the
IcebergSourceReader metrics. I could not find any information about what
metrics to expect here
.
In datadog as well, I could not find any metrics related to the
IcebergTableSourceReader. Can someone help me understand what metrics
associated with the IcebergTableSourceReader should be reported and what
metricGroup (my guess was operator) should they be part of?

Thank you
Chetas


One query just for curiosity

2024-03-28 Thread Ganesh Walse
Hi Team,
If my 1 record gets processed in 1 second in a flink. Then what will be the
best time taken to process 1000 records in flink using maximum parallelism.


Flink cache support

2024-03-28 Thread Ganesh Walse
Hi Team,

In my project my requirement is to cache data from the oracle database
where the number of tables are more and the same data will be required for
all the transactions to process.

Can you please suggest the approach where cache should be 1st loaded in
memory then stream processing should start.

Thanks & regards,
Ganesh Walse.


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Yanfei Lei
Congratulations!

Best,
Yanfei

Zhanghao Chen  于2024年3月28日周四 19:59写道:
>
> Congratulations!
>
> Best,
> Zhanghao Chen
> 
> From: Yu Li 
> Sent: Thursday, March 28, 2024 15:55
> To: d...@paimon.apache.org 
> Cc: dev ; user 
> Subject: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project
>
> CC the Flink user and dev mailing list.
>
> Paimon originated within the Flink community, initially known as Flink
> Table Store, and all our incubating mentors are members of the Flink
> Project Management Committee. I am confident that the bonds of
> enduring friendship and close collaboration will continue to unite the
> two communities.
>
> And congratulations all!
>
> Best Regards,
> Yu
>
> On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
> >
> > Congratulations!
> >
> > Best,
> > Guojun
> >
> > On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
> >
> > > Congratulations~
> > >
> > > > 2024年3月27日 15:54,王刚  写道:
> > > >
> > > > Congratulations~
> > > >
> > > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > > >>
> > > >> Hi Paimon community,
> > > >>
> > > >> I’m glad to announce that the ASF board has approved a resolution to
> > > >> graduate Paimon into a full Top Level Project. Thanks to everyone for
> > > >> your help to get to this point.
> > > >>
> > > >> I just created an issue to track the things we need to modify [2],
> > > >> please comment on it if you feel that something is missing. You can
> > > >> refer to apache documentation [1] too.
> > > >>
> > > >> And, we already completed the GitHub repo migration [3], please update
> > > >> your local git repo to track the new repo [4].
> > > >>
> > > >> You can run the following command to complete the remote repo tracking
> > > >> migration.
> > > >>
> > > >> git remote set-url origin https://github.com/apache/paimon.git
> > > >>
> > > >> If you have a different name, please change the 'origin' to your remote
> > > name.
> > > >>
> > > >> Please join me in celebrating!
> > > >>
> > > >> [1]
> > > https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > > >> [2] https://github.com/apache/paimon/issues/3091
> > > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > > >> [4] https://github.com/apache/paimon
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > >
> > >


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Zhanghao Chen
Congratulations!

Best,
Zhanghao Chen

From: Yu Li 
Sent: Thursday, March 28, 2024 15:55
To: d...@paimon.apache.org 
Cc: dev ; user 
Subject: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

CC the Flink user and dev mailing list.

Paimon originated within the Flink community, initially known as Flink
Table Store, and all our incubating mentors are members of the Flink
Project Management Committee. I am confident that the bonds of
enduring friendship and close collaboration will continue to unite the
two communities.

And congratulations all!

Best Regards,
Yu

On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
>
> Congratulations!
>
> Best,
> Guojun
>
> On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
>
> > Congratulations~
> >
> > > 2024年3月27日 15:54,王刚  写道:
> > >
> > > Congratulations~
> > >
> > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > >>
> > >> Hi Paimon community,
> > >>
> > >> I’m glad to announce that the ASF board has approved a resolution to
> > >> graduate Paimon into a full Top Level Project. Thanks to everyone for
> > >> your help to get to this point.
> > >>
> > >> I just created an issue to track the things we need to modify [2],
> > >> please comment on it if you feel that something is missing. You can
> > >> refer to apache documentation [1] too.
> > >>
> > >> And, we already completed the GitHub repo migration [3], please update
> > >> your local git repo to track the new repo [4].
> > >>
> > >> You can run the following command to complete the remote repo tracking
> > >> migration.
> > >>
> > >> git remote set-url origin https://github.com/apache/paimon.git
> > >>
> > >> If you have a different name, please change the 'origin' to your remote
> > name.
> > >>
> > >> Please join me in celebrating!
> > >>
> > >> [1]
> > https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > >> [2] https://github.com/apache/paimon/issues/3091
> > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > >> [4] https://github.com/apache/paimon
> > >>
> > >> Best,
> > >> Jingsong Lee
> >
> >


Re: Re: 1.19自定义数据源

2024-03-28 Thread Zhanghao Chen
如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/

Best,
Zhanghao Chen

From: ha.fen...@aisino.com 
Sent: Thursday, March 28, 2024 15:34
To: user-zh 
Subject: Re: Re: 1.19自定义数据源

我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?

发件人: gongzhongqiang
发送时间: 2024-03-28 15:05
收件人: user-zh
主题: Re: 1.19自定义数据源
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread gongzhongqiang
Congratulations!

Best,

Zhongqiang Gong

Yu Li  于2024年3月28日周四 15:57写道:

> CC the Flink user and dev mailing list.
>
> Paimon originated within the Flink community, initially known as Flink
> Table Store, and all our incubating mentors are members of the Flink
> Project Management Committee. I am confident that the bonds of
> enduring friendship and close collaboration will continue to unite the
> two communities.
>
> And congratulations all!
>
> Best Regards,
> Yu
>
> On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
> >
> > Congratulations!
> >
> > Best,
> > Guojun
> >
> > On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
> >
> > > Congratulations~
> > >
> > > > 2024年3月27日 15:54,王刚  写道:
> > > >
> > > > Congratulations~
> > > >
> > > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > > >>
> > > >> Hi Paimon community,
> > > >>
> > > >> I’m glad to announce that the ASF board has approved a resolution to
> > > >> graduate Paimon into a full Top Level Project. Thanks to everyone
> for
> > > >> your help to get to this point.
> > > >>
> > > >> I just created an issue to track the things we need to modify [2],
> > > >> please comment on it if you feel that something is missing. You can
> > > >> refer to apache documentation [1] too.
> > > >>
> > > >> And, we already completed the GitHub repo migration [3], please
> update
> > > >> your local git repo to track the new repo [4].
> > > >>
> > > >> You can run the following command to complete the remote repo
> tracking
> > > >> migration.
> > > >>
> > > >> git remote set-url origin https://github.com/apache/paimon.git
> > > >>
> > > >> If you have a different name, please change the 'origin' to your
> remote
> > > name.
> > > >>
> > > >> Please join me in celebrating!
> > > >>
> > > >> [1]
> > >
> https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > > >> [2] https://github.com/apache/paimon/issues/3091
> > > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > > >> [4] https://github.com/apache/paimon
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > >
> > >
>


Re: Re: 1.19自定义数据源

2024-03-28 Thread Shawn Huang
你好,关于如何实现source接口可以参考以下资料:

[1] FLIP-27: Refactor Source Interface - Apache Flink - Apache Software
Foundation

[2] 如何高效接入 Flink:Connecter / Catalog API 核心设计与社区进展 (qq.com)



Best,
Shawn Huang


liuchao  于2024年3月28日周四 15:39写道:

> 找一个实现source接口的算子,参考一下
>
>
> 刘超
> liuchao1...@foxmail.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> ha.fen...@aisino.com;
> 发送时间:2024年3月28日(星期四) 下午3:34
> 收件人:"user-zh"
> 主题:Re: Re: 1.19自定义数据源
>
>
>
> 我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
> 
> 发件人: gongzhongqiang
> 发送时间: 2024-03-28 15:05
> 收件人: user-zh
> 主题: Re: 1.19自定义数据源
> 你好:
> 
> 当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
> 版本考虑,可以将这些SourceFunction用Source重新实现。
> 
> ha.fen...@aisino.com  
> 
> 
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
> 


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Rui Fan
Congratulations~

Best,
Rui

On Thu, Mar 28, 2024 at 3:55 PM Yu Li  wrote:

> CC the Flink user and dev mailing list.
>
> Paimon originated within the Flink community, initially known as Flink
> Table Store, and all our incubating mentors are members of the Flink
> Project Management Committee. I am confident that the bonds of
> enduring friendship and close collaboration will continue to unite the
> two communities.
>
> And congratulations all!
>
> Best Regards,
> Yu
>
> On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
> >
> > Congratulations!
> >
> > Best,
> > Guojun
> >
> > On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
> >
> > > Congratulations~
> > >
> > > > 2024年3月27日 15:54,王刚  写道:
> > > >
> > > > Congratulations~
> > > >
> > > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > > >>
> > > >> Hi Paimon community,
> > > >>
> > > >> I’m glad to announce that the ASF board has approved a resolution to
> > > >> graduate Paimon into a full Top Level Project. Thanks to everyone
> for
> > > >> your help to get to this point.
> > > >>
> > > >> I just created an issue to track the things we need to modify [2],
> > > >> please comment on it if you feel that something is missing. You can
> > > >> refer to apache documentation [1] too.
> > > >>
> > > >> And, we already completed the GitHub repo migration [3], please
> update
> > > >> your local git repo to track the new repo [4].
> > > >>
> > > >> You can run the following command to complete the remote repo
> tracking
> > > >> migration.
> > > >>
> > > >> git remote set-url origin https://github.com/apache/paimon.git
> > > >>
> > > >> If you have a different name, please change the 'origin' to your
> remote
> > > name.
> > > >>
> > > >> Please join me in celebrating!
> > > >>
> > > >> [1]
> > >
> https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > > >> [2] https://github.com/apache/paimon/issues/3091
> > > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > > >> [4] https://github.com/apache/paimon
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > >
> > >
>


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Yu Li
CC the Flink user and dev mailing list.

Paimon originated within the Flink community, initially known as Flink
Table Store, and all our incubating mentors are members of the Flink
Project Management Committee. I am confident that the bonds of
enduring friendship and close collaboration will continue to unite the
two communities.

And congratulations all!

Best Regards,
Yu

On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
>
> Congratulations!
>
> Best,
> Guojun
>
> On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
>
> > Congratulations~
> >
> > > 2024年3月27日 15:54,王刚  写道:
> > >
> > > Congratulations~
> > >
> > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > >>
> > >> Hi Paimon community,
> > >>
> > >> I’m glad to announce that the ASF board has approved a resolution to
> > >> graduate Paimon into a full Top Level Project. Thanks to everyone for
> > >> your help to get to this point.
> > >>
> > >> I just created an issue to track the things we need to modify [2],
> > >> please comment on it if you feel that something is missing. You can
> > >> refer to apache documentation [1] too.
> > >>
> > >> And, we already completed the GitHub repo migration [3], please update
> > >> your local git repo to track the new repo [4].
> > >>
> > >> You can run the following command to complete the remote repo tracking
> > >> migration.
> > >>
> > >> git remote set-url origin https://github.com/apache/paimon.git
> > >>
> > >> If you have a different name, please change the 'origin' to your remote
> > name.
> > >>
> > >> Please join me in celebrating!
> > >>
> > >> [1]
> > https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > >> [2] https://github.com/apache/paimon/issues/3091
> > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > >> [4] https://github.com/apache/paimon
> > >>
> > >> Best,
> > >> Jingsong Lee
> >
> >


Re: 1.19自定义数据源

2024-03-28 Thread gongzhongqiang
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


How to define the termination criteria for iterations in Flink ML?

2024-03-28 Thread Komal M
Hi all,

I have another question regarding Flink ML’s iterations.

In the documentation it says “The iterative algorithm has an iteration body 
that is repeatedly invoked until some termination criteria is reached (e.g. 
after a user-specified number of epochs has been reached).”

My question is what exactly is the syntax (in Java) to define the number of 
epochs in the code? Or any other termination criteria inside the iteration body?

I’m using Flink v1.17.2, Flink ML v.2.3.0

Thanks in advance,
Komal







Debugging Kryo Fallback

2024-03-28 Thread Salva Alcántara
I wonder which is the simplest way of troubleshooting/debugging what causes
the Kryo fallback.

Detecting it is just a matter of adding this line to your job:

```
env.getConfig().disableGenericTypes();
```

or in more recent versions:

```
pipeline.generic-types: false

```

But once you detect the issue, what is the simplest way to debug it?
You can of course add a breakpoint in:
org.apache.flink.api.java.typeutils.TypeExtractor@analyzePojo

but ideally there should be a simpler way to show all the problems
encountered to the user without having to get that deep into the code.

Thanks in advance,

Salva


Re: Understanding checkpoint/savepoint storage requirements

2024-03-28 Thread Asimansu Bera
To add more details to it so that it will be clear why access to persistent
object stores for all JVM processes are required for a job graph of Flink
for consistent recovery.
*JoB Manager:*

Flink's JobManager writes critical metadata during checkpoints for fault
tolerance:

   - Job Configuration: Preserves job settings (parallelism, state backend)
   for consistent restarts.
   - Progress Information: Stores offsets (source/sink positions) to resume
   processing from the correct point after failures.
   - Checkpoint Counters: Provides metrics (ID, timestamp, duration) for
   monitoring checkpointing behavior.


*Task Managers:*
While the JobManager handles checkpoint metadata, TaskManagers are the
workhorses during Flink checkpoints. Here's what they do:

   - State Snapshots: Upon receiving checkpoint instructions, TaskManagers
   capture snapshots of their current state. This state includes in-memory
   data and operator variables crucial for resuming processing.
   - State Serialization: The captured state is transformed into a format
   suitable for storage, often byte arrays. This serialized data represents
   the actual application state.


A good network connection bandwidth is very crucial to write the large
state quicker to HDFS/S3 object store from all operator states so that Task
Manager could write it quickly. Oftentimes, some customers use NFS as a
persistent store which is not recommended as NFS is slow and slows down the
checkpointing.

-A


On Wed, Mar 27, 2024 at 7:52 PM Feifan Wang  wrote:

> Hi Robert :
>
> Your understanding are right !
> Add some more information : JobManager not only responsible for cleaning
> old checkpoints, but also needs to write metadata file to checkpoint
> storage after all taskmanagers have taken snapshots.
>
> ---
> Best
> Feifan Wang
>
> At 2024-03-28 06:30:54, "Robert Young"  wrote:
>
> Hi all, I have some questions about checkpoint and savepoint storage.
>
> From what I understand a distributed, production-quality job with a lot of
> state should use durable shared storage for checkpoints and savepoints. All
> job managers and task managers should access the same volume. So typically
> you'd use hadoop, S3, Azure etc.
>
> In the docs [1] it states for state.checkpoints.dir: "The storage path
> must be accessible from all participating processes/nodes(i.e. all
> TaskManagers and JobManagers)."
>
> I want to understand why that is exactly. Here's my understanding:
>
> 1. The JobManager is responsible for cleaning old checkpoints, so it needs
> access to all the files written out by all the task managers so it can
> remove them.
> 2. For recovery/rescaling if all nodes share the same volume then
> TaskManagers can read/redistribute the checkpoint data easily, since the
> volume is shared.
>
> Is that correct? Are there more aspects to why the directory must be
> shared across the processes?
>
> Thank you,
> Rob Young
>
> 1.
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/#related-config-options
>
>


Re: Flink job unable to restore from savepoint

2024-03-27 Thread prashant parbhane
flink version 1.17
Didn't change any job configuration.

We are facing this below issue.
https://issues.apache.org/jira/browse/FLINK-23886

On Wed, Mar 27, 2024 at 1:39 AM Hangxiang Yu  wrote:

> Hi, Prashant.
> Which Flink version did you use?
> And Did you modify your job logic or configurations ? If yes, Could you
> share changed things ?
>
> On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane 
> wrote:
>
>> Hello,
>>
>> We have been facing this weird issue of not being able to restore from
>> savepoint, when we have a significant load on flink jobs.
>>
>> "
>> *org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>> the user key.*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*
>>
>> *Caused by: java.io.EOFException*
>>
>> *at
>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*
>>
>> *at
>> org.apache.flink.types.StringValue.readString(StringValue.java:781)*
>>
>> *at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*
>>
>> *at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
>> "
>>
>>
>> Any suggestions?
>>
>> Thanks,
>> Prashant
>>
>>
>>
>>
>>
>
> --
> Best,
> Hangxiang.
>


Re:Understanding checkpoint/savepoint storage requirements

2024-03-27 Thread Feifan Wang
Hi Robert :


Your understanding are right ! 
Add some more information : JobManager not only responsible for cleaning old 
checkpoints, but also needs to write metadata file to checkpoint storage after 
all taskmanagers have taken snapshots.


---
Best
Feifan Wang



At 2024-03-28 06:30:54, "Robert Young"  wrote:

Hi all, I have some questions about checkpoint and savepoint storage.

From what I understand a distributed, production-quality job with a lot of 
state should use durable shared storage for checkpoints and savepoints. All job 
managers and task managers should access the same volume. So typically you'd 
use hadoop, S3, Azure etc.

In the docs [1] it states for state.checkpoints.dir: "The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers)."

I want to understand why that is exactly. Here's my understanding:

1. The JobManager is responsible for cleaning old checkpoints, so it needs 
access to all the files written out by all the task managers so it can remove 
them.
2. For recovery/rescaling if all nodes share the same volume then TaskManagers 
can read/redistribute the checkpoint data easily, since the volume is shared.

Is that correct? Are there more aspects to why the directory must be shared 
across the processes?

Thank you,
Rob Young

1. 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/#related-config-options


Re:[discuss] [jdbc] Support Ignore deleting is required?

2024-03-27 Thread Xuyang
Hi, ouywl.
IMO, +1 for this option. You can start a discussion on the dev mailing list[1] 
to seek more input from more community developers.


[1] d...@flink.apache.org



--

Best!
Xuyang




At 2024-03-27 11:28:37, "ou...@139.com"  wrote:

When using the jdbc sink connector, there are a lot of scenarios where we 
ignore delete (-D) record when writing out data. Our jdbc sink connector 
scenario needs to add a configuration sink.ignore-delete attribute. The default 
value is false, and data deletion is not ignored. The user can set true





|

ou...@139.com

18896723...@139.com

18896723655

电子名片新出VIP模板啦,快来体验>>
|

扫一扫,

快速添加名片到手机

|

Understanding checkpoint/savepoint storage requirements

2024-03-27 Thread Robert Young
Hi all, I have some questions about checkpoint and savepoint storage.

>From what I understand a distributed, production-quality job with a lot of
state should use durable shared storage for checkpoints and savepoints. All
job managers and task managers should access the same volume. So typically
you'd use hadoop, S3, Azure etc.

In the docs [1] it states for state.checkpoints.dir: "The storage path must
be accessible from all participating processes/nodes(i.e. all TaskManagers
and JobManagers)."

I want to understand why that is exactly. Here's my understanding:

1. The JobManager is responsible for cleaning old checkpoints, so it needs
access to all the files written out by all the task managers so it can
remove them.
2. For recovery/rescaling if all nodes share the same volume then
TaskManagers can read/redistribute the checkpoint data easily, since the
volume is shared.

Is that correct? Are there more aspects to why the directory must be shared
across the processes?

Thank you,
Rob Young

1.
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/#related-config-options


Re: End-to-end lag spikes when closing a large number of panes

2024-03-27 Thread Robert Metzger
Hey Caio,

Your analysis of the problem sounds right to me, I don't have a good
solution for you :(

I’ve validated that CPU profiles show clearAllState using a significant
> amount of CPU.


Did you use something like async-profiler here? Do you have more info on
the breakdown into what used the CPU time?
Once we know that, there might be an opportunity to do such operations
async/lazily, or fix something with the underlying platform (e.g. Rocksdb
is slow, ...)


On Thu, Mar 21, 2024 at 12:05 AM Caio Camatta via user <
user@flink.apache.org> wrote:

> Hey Asimansu,
>
> The inputDataStream is a KeyedStream, I forgot to mention that.
>
> Caio
>
> On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera 
> wrote:
>
>> Hello Caio,
>>
>> Based on the pseudocode, there is no keyed function present. Hence, the
>> window will not be processed parallely . Please check again and respond
>> back.
>>
>> val windowDataStream =
>>
>>   inputDataStream
>>
>> .window(TumblingEventTimeWindows of 1 hour)
>>
>> .trigger(custom trigger)
>>
>> .aggregate(
>>
>>preAggregator = custom AggregateFunction,
>>
>>windowFunction = custom ProcessWindowFunction
>>
>> )
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>>
>> -A
>>
>>
>> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <
>> user@flink.apache.org> wrote:
>>
>>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
>>> and we’re seeing lag spikes on window closure. I’m curious if others have
>>> encountered similar issues before and if anyone has suggestions for how to
>>> tackle this problem (other than simply increasing parallelism).
>>> ContextLag definition
>>>
>>> We define end-to-end lag as the delta between the time when the event
>>> was persisted in Kafka and the time when Flink finishes processing the
>>> event.
>>> Window operator definition
>>>
>>> The important parts (in pseudocode):
>>>
>>> val windowDataStream =
>>>
>>>   inputDataStream
>>>
>>> .window(TumblingEventTimeWindows of 1 hour)
>>>
>>> .trigger(custom trigger)
>>>
>>> .aggregate(
>>>
>>>preAggregator = custom AggregateFunction,
>>>
>>>windowFunction = custom ProcessWindowFunction
>>>
>>> )
>>>
>>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e.
>>> we don’t run any user-defined logic at the end of the window. (This trigger
>>> only fires while the window is active via custom logic in onElement.)
>>> Numbers
>>>
>>> Our Flink app processes ~3K events per second and I’ve calculated that
>>> there are around 200-300K panes to close per Task at the end of the 1-hour
>>> window. Our lag is fairly stable at a few hundred milliseconds during
>>> the window but spikes to 5-10 seconds when the window expires, which is a
>>> problem for us.
>>> The issue
>>>
>>> The magnitude of the lag spikes on window closure correlate with
>>>
>>>-
>>>
>>>the size of the window (a 1-hour window has bigger spikes than a
>>>5-minute window.)
>>>-
>>>
>>>the cardinality of the keys in the event stream.
>>>-
>>>
>>>the number of events being processed per second.
>>>
>>> In other words, the more panes to close, the bigger the lag spike. I'm
>>> fairly sure that the lag is coming entirely from the WindowOperator’s
>>> clearAllState and I’ve validated that CPU profiles show clearAllState
>>> using a significant amount of CPU.
>>>
>>> Does this theory sound plausible? What could we do to minimize the
>>> effects of window clean-up? It would be nice to do it incrementally or
>>> asynchronously but I'm not sure if Flink provides this functionality today.
>>> Thanks,
>>> Caio
>>>
>>


Re: Discussion thread : Proposal to add Conditions in Flink CRD's Status field

2024-03-27 Thread Ahmed Hamdy
I am sorry it should be "d...@flink.apache.org"
Best Regards
Ahmed Hamdy


On Wed, 27 Mar 2024 at 13:00, Ahmed Hamdy  wrote:

> Hi Lajith,
> Could you please open the discussion thread against "d...@apache.flink.org",
> I believe it is better suited there.
> Best Regards
> Ahmed Hamdy
>
>
> On Wed, 27 Mar 2024 at 05:33, Lajith Koova  wrote:
>
>> 
>>
>> Hello,
>>
>>
>> Starting discussion thread here to discuss a proposal to add Conditions
>> field in the CR status of Flink Deployment and FlinkSessionJob.
>>
>>
>> Here is the google doc with details. Please provide your thoughts/inputs.
>>
>>
>>
>>
>> https://docs.google.com/document/d/12wlJCL_Vq2KZnABzK7OR7gAd1jZMmo0MxgXQXqtWODs/edit?usp=sharing
>>
>>
>> Thanks
>>
>>


Re: Discussion thread : Proposal to add Conditions in Flink CRD's Status field

2024-03-27 Thread Ahmed Hamdy
Hi Lajith,
Could you please open the discussion thread against "d...@apache.flink.org",
I believe it is better suited there.
Best Regards
Ahmed Hamdy


On Wed, 27 Mar 2024 at 05:33, Lajith Koova  wrote:

> 
>
> Hello,
>
>
> Starting discussion thread here to discuss a proposal to add Conditions
> field in the CR status of Flink Deployment and FlinkSessionJob.
>
>
> Here is the google doc with details. Please provide your thoughts/inputs.
>
>
>
> https://docs.google.com/document/d/12wlJCL_Vq2KZnABzK7OR7gAd1jZMmo0MxgXQXqtWODs/edit?usp=sharing
>
>
> Thanks
>
>


RE: need flink support framework for dependency injection

2024-03-27 Thread Schwalbe Matthias
Hi Ruibin,

Our code [1] targets a very old version of Flink 1.8, for current development 
my employer didn’t decide (yet?) to contribute it to the public.
That old code does not yet contain the abstractions for setup of state 
primitive, so let me sketch it here:

  *   Derive a specific implementation per operator from 
SetupDualUnboundedBoundedState
  *   All state primitive setup is then implemented in the respective open() 
function
  *   Derive the operator and other (savepoint reader/writer) from this state 
setup class/trait
  *   For convenience there is a boundedMode field that tells the operator 
whether run in bounded/streaming mode (as the time semantics are similar yet 
different)
  *   This is one example where we ‘patched’ the non-public runtime 
implementation (mentioned in that other mail), therefore it needs to be 
maintained Flink version by Flink version 

Feel free to query details …

Sincere greetings

Thias


[1] https://github.com/VisecaCard/flink-commons
[2] common savepoint setup:
/** Marker trait for flink functions/operators that can run in both Bounded 
(BATCH) and Unbounded (PIPELINED) mode,
* and for auxiliary functions for savepoint priming and reading.
*
* @note Derive a specific trait/mixin for each respective flink streaming 
function/operator that initializes
*   state primitives. Mixin that trait into auxiliary functions for 
savepoint priming and reading, to have a common
*   state initialization.
* @note Call 
[[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]]
*   in order to initialize this field.
*
* */
trait SetupDualUnboundedBoundedState extends Serializable {

  /** Determines at runtime, if the job DAG is running in Bounded (BATCH) or 
Unbounded (PIPELINED) mode.
   *
   * @note Call 
[[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]]
   *   in order to initialize this field.
   * */
  @transient var boundedMode = false

  /** Opens the respective function/operator for initialization of state 
primitives */
  def open(rtc: RuntimeContext): Unit = {
boundedMode =
  rtc match {
case src: StreamingRuntimeContext =>
  src.getTaskManagerRuntimeInfo.getConfiguration
.get[RuntimeExecutionMode](ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
case _ => false
  }
  }
}




From: Ruibin Xing 
Sent: Wednesday, March 27, 2024 10:41 AM
To: Schwalbe Matthias 
Cc: Marco Villalobos ; Ganesh Walse 
; user@flink.apache.org
Subject: Re: need flink support framework for dependency injection

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Thias,

Could you share your approach to job setup using Spring, if that's possible? We 
also use Spring Boot for DI in jobs, primarily relying on profiles. I'm 
particularly interested in how you use the same job structures for different 
scenarios, such as reading savepoints. Thank you very much.

On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Ganesh,

I tend to agree with Marco. However your 'feature request' is very loose and 
leave much room for misunderstanding.

There are at least two scenarios for DI integration:
- DI for job setup:
  - we use spring for job setup, which
- lets us use the same job structure for (at least) 4 scenarios: 
streaming job, batch job for savepoint priming, savepoint reading, 
transformation for complex schema changes -> savepoint writing
- we also appreciate a very convenient integration of a layered 
configuration by means of spring profiles
- we can easily replace e.g. sources and sinks for test/local 
develop/debug scenarios
- however this can also easily be done without DI
- our approach is public (Apache 2.0 license), if interested
- DI for Flink would probably be counterproductive for a number of reasons 
(some guesswork here  )
- from what I see, the Flink code base is separated into two clearly 
distinct parts: the public API, and the non-public implementation
- Flink community takes great efforts to guarantee backwards 
compatibility of the public API, which also allows for replacing the underneath 
implementation
- the private API mostly uses the Service-Locator pattern (sort of) 
also to make it harder to introduce arbitrary changes to the architecture, 
which would be hard to include into the backwards-compatibility-guaranties
- if need be, in most cases you can change the non-public implementation
- by implementing a set of replacement classes (quite tedious) 
and wire them in, but
- that forces you to re-integrate for every new version of 
Flink (even more tedious )
- we've done so in select cases that were not of interest for the 
general public,
- alternatively, if your extension use case is of 

Error when using FlinkML iterations with KeyedCoProcessFunction

2024-03-27 Thread Komal M
Hi,

As the DataStream API's iterativeStream method has been deprecated for future 
flink releases, the documentation recommend’s using Flink ML's iteration as an 
alternative. I am trying to build my understanding of the new iterations API as 
it will be a requirement for our future projects.

As an exercise, I’m trying to implement a KeyedRichCoProcessFunction inside the 
iteration body that takes the feedback Stream and non-feedbackstream as inputs 
but get the following error. Do you know what could be causing it? For 
reference, I do not get any error when applying  .keyBy().flatMap() function on 
the streams individually inside the iteration body.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
….
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.ClassCastException: class 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple 
(org.apache.flink.iteration.IterationRecord and 
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
at 
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)


I am attaching the full test code below for reference. All it does is subtracts 
1 from the feeback stream until  the tuples reaches 0.0. For each subtraction 
it outputs a relevant message in the finaloutput stream. These messages are 
stored in the keyedState of KeyedCoProcessFunction and are preloaded in the 
parallel instances by a dataset stream called initialStates. For each key there 
are different messages associated with it, hence the need for MapState.



import java.util.*;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBodyResult;
import 

Re: need flink support framework for dependency injection

2024-03-27 Thread Ruibin Xing
Hi Thias,

Could you share your approach to job setup using Spring, if that's
possible? We also use Spring Boot for DI in jobs, primarily relying on
profiles. I'm particularly interested in how you use the same job
structures for different scenarios, such as reading savepoints. Thank you
very much.


On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ganesh,
>
> I tend to agree with Marco. However your 'feature request' is very loose
> and leave much room for misunderstanding.
>
> There are at least two scenarios for DI integration:
> - DI for job setup:
>   - we use spring for job setup, which
> - lets us use the same job structure for (at least) 4 scenarios:
> streaming job, batch job for savepoint priming, savepoint reading,
> transformation for complex schema changes -> savepoint writing
> - we also appreciate a very convenient integration of a layered
> configuration by means of spring profiles
> - we can easily replace e.g. sources and sinks for test/local
> develop/debug scenarios
> - however this can also easily be done without DI
> - our approach is public (Apache 2.0 license), if interested
> - DI for Flink would probably be counterproductive for a number of reasons
> (some guesswork here  )
> - from what I see, the Flink code base is separated into two
> clearly distinct parts: the public API, and the non-public implementation
> - Flink community takes great efforts to guarantee backwards
> compatibility of the public API, which also allows for replacing the
> underneath implementation
> - the private API mostly uses the Service-Locator pattern (sort
> of) also to make it harder to introduce arbitrary changes to the
> architecture, which would be hard to include into the
> backwards-compatibility-guaranties
> - if need be, in most cases you can change the non-public
> implementation
> - by implementing a set of replacement classes (quite
> tedious) and wire them in, but
> - that forces you to re-integrate for every new version of
> Flink (even more tedious )
> - we've done so in select cases that were not of interest for the
> general public,
> - alternatively, if your extension use case is of public interest, it is
> better to make a proposal for a change and negotiate agreement with the
> community of whether and how to implement it
> - we've also done so (recent example: [1])
>
> WDYT? What is your case for DI? ...
>
> Sincere greetings
>
> Thias
>
> [1] https://issues.apache.org/jira/browse/FLINK-26585
>
>
>
>
> -Original Message-
> From: Marco Villalobos 
> Sent: Tuesday, March 26, 2024 11:40 PM
> To: Ganesh Walse 
> Cc: user@flink.apache.org
> Subject: Re: need flink support framework for dependency injection
>
> Hi Ganesh,
>
> I disagree. I don’t think Flink needs a dependency injection framework. I
> have implemented many complex jobs without one. Can you please articulate
> why you think it needs a dependency injection framework, along with some
> use cases that will show its benefit?
>
> I would rather see more features related to stream programming,
> data-governance, file based table formats, or ML.
>
>
> > On Mar 26, 2024, at 2:27 PM, Ganesh Walse 
> wrote:
> >
> > 
> >
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Flink job unable to restore from savepoint

2024-03-27 Thread Yanfei Lei
Hi Prashant,

Compared to the job that generated savepoint, are there any changes in
the new job? For example, data fields were added or deleted, or the
type serializer was changed?
More detailed job manager logs may help.

prashant parbhane  于2024年3月27日周三 14:20写道:
>
> Hello,
>
> We have been facing this weird issue of not being able to restore from 
> savepoint, when we have a significant load on flink jobs.
>
> "org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> user key.
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)
>
> Caused by: java.io.EOFException
>
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492"
>
>
> Any suggestions?
>
>
> Thanks,
> Prashant
>
>
>
>


-- 
Best,
Yanfei


Community Over Code NA 2024 Travel Assistance Applications now open!

2024-03-27 Thread Gavin McDonald
Hello to all users, contributors and Committers!

[ You are receiving this email as a subscriber to one or more ASF project
dev or user
  mailing lists and is not being sent to you directly. It is important that
we reach all of our
  users and contributors/committers so that they may get a chance to
benefit from this.
  We apologise in advance if this doesn't interest you but it is on topic
for the mailing
  lists of the Apache Software Foundation; and it is important please that
you do not
  mark this as spam in your email client. Thank You! ]

The Travel Assistance Committee (TAC) are pleased to announce that
travel assistance applications for Community over Code NA 2024 are now
open!

We will be supporting Community over Code NA, Denver Colorado in
October 7th to the 10th 2024.

TAC exists to help those that would like to attend Community over Code
events, but are unable to do so for financial reasons. For more info
on this years applications and qualifying criteria, please visit the
TAC website at < https://tac.apache.org/ >. Applications are already
open on https://tac-apply.apache.org/, so don't delay!

The Apache Travel Assistance Committee will only be accepting
applications from those people that are able to attend the full event.

Important: Applications close on Monday 6th May, 2024.

Applicants have until the the closing date above to submit their
applications (which should contain as much supporting material as
required to efficiently and accurately process their request), this
will enable TAC to announce successful applications shortly
afterwards.

As usual, TAC expects to deal with a range of applications from a
diverse range of backgrounds; therefore, we encourage (as always)
anyone thinking about sending in an application to do so ASAP.

For those that will need a Visa to enter the Country - we advise you apply
now so that you have enough time in case of interview delays. So do not
wait until you know if you have been accepted or not.

We look forward to greeting many of you in Denver, Colorado , October 2024!

Kind Regards,

Gavin

(On behalf of the Travel Assistance Committee)


Re: Flink job unable to restore from savepoint

2024-03-27 Thread Hangxiang Yu
Hi, Prashant.
Which Flink version did you use?
And Did you modify your job logic or configurations ? If yes, Could you
share changed things ?

On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane 
wrote:

> Hello,
>
> We have been facing this weird issue of not being able to restore from
> savepoint, when we have a significant load on flink jobs.
>
> "
> *org.apache.flink.util.FlinkRuntimeException: Error while deserializing
> the user key.*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*
>
> *Caused by: java.io.EOFException*
>
> *at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*
>
> *at
> org.apache.flink.types.StringValue.readString(StringValue.java:781)*
>
> *at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*
>
> *at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
> "
>
>
> Any suggestions?
>
> Thanks,
> Prashant
>
>
>
>
>

-- 
Best,
Hangxiang.


RE: need flink support framework for dependency injection

2024-03-27 Thread Schwalbe Matthias
Hi Ganesh,

I tend to agree with Marco. However your 'feature request' is very loose and 
leave much room for misunderstanding.

There are at least two scenarios for DI integration:
- DI for job setup:
  - we use spring for job setup, which 
- lets us use the same job structure for (at least) 4 scenarios: 
streaming job, batch job for savepoint priming, savepoint reading, 
transformation for complex schema changes -> savepoint writing
- we also appreciate a very convenient integration of a layered 
configuration by means of spring profiles
- we can easily replace e.g. sources and sinks for test/local 
develop/debug scenarios
- however this can also easily be done without DI
- our approach is public (Apache 2.0 license), if interested
- DI for Flink would probably be counterproductive for a number of reasons 
(some guesswork here  )
- from what I see, the Flink code base is separated into two clearly 
distinct parts: the public API, and the non-public implementation
- Flink community takes great efforts to guarantee backwards 
compatibility of the public API, which also allows for replacing the underneath 
implementation
- the private API mostly uses the Service-Locator pattern (sort of) 
also to make it harder to introduce arbitrary changes to the architecture, 
which would be hard to include into the backwards-compatibility-guaranties
- if need be, in most cases you can change the non-public implementation
- by implementing a set of replacement classes (quite tedious) 
and wire them in, but 
- that forces you to re-integrate for every new version of 
Flink (even more tedious )
- we've done so in select cases that were not of interest for the 
general public,
- alternatively, if your extension use case is of public interest, it is better 
to make a proposal for a change and negotiate agreement with the community of 
whether and how to implement it
- we've also done so (recent example: [1])

WDYT? What is your case for DI? ...

Sincere greetings

Thias

[1] https://issues.apache.org/jira/browse/FLINK-26585




-Original Message-
From: Marco Villalobos  
Sent: Tuesday, March 26, 2024 11:40 PM
To: Ganesh Walse 
Cc: user@flink.apache.org
Subject: Re: need flink support framework for dependency injection

Hi Ganesh,

I disagree. I don’t think Flink needs a dependency injection framework. I have 
implemented many complex jobs without one. Can you please articulate why you 
think it needs a dependency injection framework, along with some use cases that 
will show its benefit?

I would rather see more features related to stream programming, 
data-governance, file based table formats, or ML.


> On Mar 26, 2024, at 2:27 PM, Ganesh Walse  wrote:
> 
> 
> 
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Flink job unable to restore from savepoint

2024-03-27 Thread prashant parbhane
Hello,

We have been facing this weird issue of not being able to restore from
savepoint, when we have a significant load on flink jobs.

"
*org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
user key.*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*

*Caused by: java.io.EOFException*

*at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*

*at org.apache.flink.types.StringValue.readString(StringValue.java:781)*

*at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*

*at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
"


Any suggestions?

Thanks,
Prashant


[discuss] [jdbc] Support Ignore deleting is required?

2024-03-26 Thread ou...@139.com

When using the jdbc sink connector, there are a lot of scenarios where we 
ignore delete (-D) record when writing out data. Our jdbc sink connector 
scenario needs to add a configuration sink.ignore-delete attribute. The default 
value is false, and data deletion is not ignored. The user can set true












ou...@139.com


18896723...@139.com


18896723655







电子名片新出VIP模板啦,快来体验>>




扫一扫,


快速添加名片到手机





Discussion thread : Proposal to add Conditions in Flink CRD's Status field

2024-03-26 Thread Lajith Koova

Hello,

Starting discussion thread here to discuss a proposal to add Conditions field 
in the CR status of Flink Deployment and FlinkSessionJob. 

Here is the google doc with details. Please provide your thoughts/inputs. 

https://docs.google.com/document/d/12wlJCL_Vq2KZnABzK7OR7gAd1jZMmo0MxgXQXqtWODs/edit?usp=sharing

Thanks



Re: need flink support framework for dependency injection

2024-03-26 Thread Marco Villalobos
Hi Ganesh,

I disagree. I don’t think Flink needs a dependency injection framework. I have 
implemented many complex jobs without one. Can you please articulate why you 
think it needs a dependency injection framework, along with some use cases that 
will show its benefit?

I would rather see more features related to stream programming, 
data-governance, file based table formats, or ML.


> On Mar 26, 2024, at 2:27 PM, Ganesh Walse  wrote:
> 
> 
> 


need flink support framework for dependency injection

2024-03-26 Thread Ganesh Walse



Re: Temporal join on rolling aggregate

2024-03-26 Thread Matthias Broecheler
Hey Sebastien et al,

have you tried rewriting the rolling aggregate as a window-over query? A
window-over aggregation creates an append-only stream which should preserve
the timestamp/watermark of the source. You can then add a deduplication

to
create a versioned state that you can use in a temporal join. The partition
key of the deduplication becomes the primary key to join on.

I think what you are running into is that Flink creates a change-stream for
a group-by aggregation that has retractions. Temporal joins only work on
"versioned state" or lookup tables.

However, I think you have a valid point in a more foundational limitation
of FlinkSQL: It is currently not possible to create an append-only stream
from a change stream without going through the DataStream API. It would be
incredibly useful to support this natively in FlinkSQL. And Calcite has
support for the STREAM 
keyword to do this. Materialize (i.e. differential dataflow) has a somewhat
related construct in their SQL called a subscription
.

We added support for this in DataSQRL

(which
is a streaming database compiler that generates Flink jobs) using a syntax
that looks like this:
STREAM ON UPDATE AS (*YOUR BASE QUERY*)
to create a append-only stream from a change-stream by essentially dropping
all retractions and deletes (you can also do STREAM ON DELETE to get only
deletes, etc). However, I think this might be a feature that should live in
FlinkSQL instead and we'd be happy to create a FLIP and donate our
implementation if there is interest.

Cheers,
Matthias

On Mon, Mar 18, 2024 at 3:01 AM Sebastien  wrote:

> Hi everyone,
>
> Before digging into what it would it take to implement a general solution,
> I narrowed down the scope to write a fix which makes the query mentioned in
> the thread work. Here are some findings:
>
> - For the temporal join logic, it's not the watermark that matters but
> having a TimeIndicatorRelDataType column in the input relation. To address
> that, in the PR below, we introduced a call to the LAST_VALUE aggregate
> function to bring a timestamp column to the view. That makes the query
> works, but we think it is not enough. It would probably require a distinct
> aggregate function or a new syntax to be able to support more general use
> cases.
> - There is a relationship between the way the logical operators are
> reordered, the way the special Flink's Timestamp time is materialized and
> the watermark assigner.
> - I also looked into the flink-sql-parser and I found out that Flink has
> customized the parsing of the CREATE and DROP statements (
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd#L627-L638)
> (and Calcite supports as well support customizations for ALTER statements)
> but Calcite does not seem to support changes to the SELECT syntax (see
> https://issues.apache.org/jira/browse/CALCITE-4979). I mention it because
> I think it will inform what could be done syntax-wise.
>
> and a PR that highlights the changes with the complete investigation
> https://github.com/apache/flink/pull/24506
>
> This is more of a demonstration and I am looking to get feedback from
> someone who has more experience with the codebase.
>
> Thanks,
> Seb
>
> On Tue, Mar 5, 2024, at 10:07, Gyula Fóra wrote:
>
> Hi Everyone!
>
> I have discussed this with Sébastien Chevalley, he is going to prepare and
> drive the FLIP while I will assist him along the way.
>
> Thanks
> Gyula
>
> On Tue, Mar 5, 2024 at 9:57 AM  wrote:
>
> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>
>


Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-26 Thread Yang Wang
Usually, you should use the HDFS nameservice instead of the NameNode
hostname:port to avoid NN failover.
And you could find the supported nameservice in the hdfs-site.xml in the
key *dfs.nameservices*.


Best,
Yang

On Fri, Mar 22, 2024 at 8:33 PM Sachin Mittal  wrote:

> So, when we create an EMR cluster the NN service runs on the primary node
> of the cluster.
> Now at the time of creating the cluster, how can we specify the name of
> this NN in format hdfs://*namenode-host*:8020/.
>
> Is there a standard name by which we can identify the NN server ?
>
> Thanks
> Sachin
>
>
> On Fri, Mar 22, 2024 at 12:08 PM Asimansu Bera 
> wrote:
>
>> Hello Sachin,
>>
>> Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
>> down or VMs are required to be shut down for security updates or due to
>> faults, new VMs will be added to the cluster. As a result, any data stored
>> in the local file system, such as file://tmp, would be lost. To ensure data
>> persistence and prevent loss of checkpoint or savepoint data for recovery,
>> it is advisable to store such data in a persistent storage solution like
>> HDFS or S3.
>>
>> Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
>> details from EMR service.
>>
>> Hope this helps.
>>
>> -A
>>
>>
>> On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal 
>> wrote:
>>
>>> Hi,
>>> We are using AWS EMR where we can submit our flink jobs to a long
>>> running flink cluster on Yarn.
>>>
>>> We wanted to configure RocksDBStateBackend as our state backend to store
>>> our checkpoints.
>>>
>>> So we have configured following properties in our flink-conf.yaml
>>>
>>>- state.backend.type: rocksdb
>>>- state.checkpoints.dir: file:///tmp
>>>- state.backend.incremental: true
>>>
>>>
>>> My question here is regarding the checkpoint location: what is the
>>> difference between the location if it is a local filesystem vs a hadoop
>>> distributed file system (hdfs).
>>>
>>> What advantages we get if we use:
>>>
>>> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
>>> vs
>>> *state.checkpoints.dir*: file:///tmp
>>>
>>> Also if we decide to use HDFS then from where we can get the value for
>>> *namenode-host:port*
>>> given we are running Flink on an EMR.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>


Re: There is no savepoint operation with triggerId

2024-03-25 Thread Yanfei Lei
Hi Lars,

It looks like the relevant logs when retrieving savepoint.
Have you frequently retrieved savepoints through the REST interface?

Lars Skjærven  于2024年3月26日周二 07:17写道:
>
> Hello,
> My job manager is constantly complaining with the following error:
>
> "Exception occurred in REST handler: There is no savepoint operation with 
> triggerId= for job bc0cb60b710e64e23195aa1610ad790a".
>
> "logger_name":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler"
>
> Checkpointing and savepointing seems to work as expected, but my logs are 
> flooded of these errors. Any tips ?
>
> L



-- 
Best,
Yanfei


There is no savepoint operation with triggerId

2024-03-25 Thread Lars Skjærven
Hello,
My job manager is constantly complaining with the following error:

"Exception occurred in REST handler: There is no savepoint operation with
triggerId= for job bc0cb60b710e64e23195aa1610ad790a".

"logger_name":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler"

Checkpointing and savepointing seems to work as expected, but my logs are
flooded of these errors. Any tips ?

L


Re: Unable to Generate Flame Graphs in Flink UI

2024-03-25 Thread Feng Jin
Hi Nitin

You don't need any additional configuration. After the sampling is
completed, the flame graph will be displayed.

Best,
Feng

On Mon, Mar 25, 2024 at 5:55 PM Nitin Saini 
wrote:

> Hi Flink Community,
>
> I've set up a Flink cluster using Flink version 1.17.2 and enabled the
> option "rest.flamegraph.enabled: true" to generate flame graphs. However,
> upon accessing the Flink UI, I encountered the message: "We are waiting for
> the first samples to create a flame graph."
>
> Could you kindly advise if there's any configuration step I might have
> overlooked? I've attached a screenshot of the flink UI for your reference.
>
> Best regards,
> Nitin Saini
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.8.0 released

2024-03-25 Thread Rui Fan
Congratulations! Thanks Max for the release and all involved for the great
work!

A gentle reminder to users: the maven artifact has just been released and
will take some time to complete.

Best,
Rui

On Mon, Mar 25, 2024 at 6:35 PM Maximilian Michels  wrote:

> The Apache Flink community is very happy to announce the release of
> the Apache Flink Kubernetes Operator version 1.8.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache
> Flink applications on Kubernetes through all aspects of their
> lifecycle.
>
> Release highlights:
> - Flink Autotuning automatically adjusts TaskManager memory
> - Flink Autoscaling metrics and decision accuracy improved
> - Improve standalone Flink Autoscaling
> - Savepoint trigger nonce for savepoint-based restarts
> - Operator stability improvements for cluster shutdown
>
> Blog post:
> https://flink.apache.org/2024/03/21/apache-flink-kubernetes-operator-1.8.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
>
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12353866=12315522
>
> We would like to thank the Apache Flink community and its contributors
> who made this release possible!
>
> Cheers,
> Max
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.8.0 released

2024-03-25 Thread Rui Fan
Congratulations! Thanks Max for the release and all involved for the great
work!

A gentle reminder to users: the maven artifact has just been released and
will take some time to complete.

Best,
Rui

On Mon, Mar 25, 2024 at 6:35 PM Maximilian Michels  wrote:

> The Apache Flink community is very happy to announce the release of
> the Apache Flink Kubernetes Operator version 1.8.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache
> Flink applications on Kubernetes through all aspects of their
> lifecycle.
>
> Release highlights:
> - Flink Autotuning automatically adjusts TaskManager memory
> - Flink Autoscaling metrics and decision accuracy improved
> - Improve standalone Flink Autoscaling
> - Savepoint trigger nonce for savepoint-based restarts
> - Operator stability improvements for cluster shutdown
>
> Blog post:
> https://flink.apache.org/2024/03/21/apache-flink-kubernetes-operator-1.8.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
>
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12353866=12315522
>
> We would like to thank the Apache Flink community and its contributors
> who made this release possible!
>
> Cheers,
> Max
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.8.0 released

2024-03-25 Thread Maximilian Michels
The Apache Flink community is very happy to announce the release of
the Apache Flink Kubernetes Operator version 1.8.0.

The Flink Kubernetes Operator allows users to manage their Apache
Flink applications on Kubernetes through all aspects of their
lifecycle.

Release highlights:
- Flink Autotuning automatically adjusts TaskManager memory
- Flink Autoscaling metrics and decision accuracy improved
- Improve standalone Flink Autoscaling
- Savepoint trigger nonce for savepoint-based restarts
- Operator stability improvements for cluster shutdown

Blog post: 
https://flink.apache.org/2024/03/21/apache-flink-kubernetes-operator-1.8.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12353866=12315522

We would like to thank the Apache Flink community and its contributors
who made this release possible!

Cheers,
Max


[ANNOUNCE] Apache Flink Kubernetes Operator 1.8.0 released

2024-03-25 Thread Maximilian Michels
The Apache Flink community is very happy to announce the release of
the Apache Flink Kubernetes Operator version 1.8.0.

The Flink Kubernetes Operator allows users to manage their Apache
Flink applications on Kubernetes through all aspects of their
lifecycle.

Release highlights:
- Flink Autotuning automatically adjusts TaskManager memory
- Flink Autoscaling metrics and decision accuracy improved
- Improve standalone Flink Autoscaling
- Savepoint trigger nonce for savepoint-based restarts
- Operator stability improvements for cluster shutdown

Blog post: 
https://flink.apache.org/2024/03/21/apache-flink-kubernetes-operator-1.8.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12353866=12315522

We would like to thank the Apache Flink community and its contributors
who made this release possible!

Cheers,
Max


Custom metricReporterFactory could not be found in k8s deployment.

2024-03-22 Thread longfeng Xu
hi all,
bg:
create a custom metric reporter via kafka,
it works in ideaj local environment.
but failed when packaged and deployed  in k8s env (ververica by alibaba)
flink 1.12
config:
metrics.reporter.kafka.factory.class:
org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.server: 
metrics.reporter.kafka.interval: xxx
log:
reporterSetup  - The reporter
factory (org.apache.flink.metrics.kafka.KafkaRepoertFactory) could not be
found for reporter kafka.

it confused. and i checed jar
:/META-INF/services/(org.apache.flink.metrics.MetricReporterFacotry
corrected.

could someone can give me some advice. thanks.


<    2   3   4   5   6   7   8   9   10   11   >