Using HybridSource

2023-07-03 Thread Oscar Perez via user
Hei, We want to bootstrap some data from a CSV file before reading from a
kafka topic that has a retention period of 7 days.

We believe the best tool for that would be the HybridSource but the problem
we are facing is that both datasources are of different nature. The
KafkaSource returns a protobuf event while the CSV is a POJO with just 3
fields.

We could hack the kafkasource implementation and then in the
valuedeserializer do the mapping from protobuf to the CSV POJO but that
seems rather hackish. Is there a way more elegant to unify both datatypes
from both sources using Hybrid Source?

thanks
Oscar


Difference between different values for starting offset

2023-07-03 Thread Oscar Perez via user
Hei,

Looking at the flink documentation for kafkasource I see the following
values for starting offset:

OffsetInitializer.earliest
OffsetInitializer.latest
OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)

>From what I understand OffsetInitializer.earliest uses earliest offset the
first time but later deployments will use the committed offset in the flink
state to resume from there. If that is the case what is the difference
between OffsetInitializer.earliest and
commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
committed offset after redeployment?

Thanks!
Oscar


Re: Using HybridSource

2023-07-04 Thread Oscar Perez via user
ok, but is it? As I said, both sources have different data types. In the
example here:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

We are using both sources as returning string but in our case, one source
would return a protobuf event while the other would return a pojo. How can
we make the 2 sources share the same datatype so that we can successfully
use hybrid source?

Regards,
Oscar

On Tue, 4 Jul 2023 at 12:04, Alexey Novakov  wrote:

> Hi Oscar,
>
> You could use connected streams and put your file into a special Kafka
> topic before starting such a job:
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
> But this may require more work and the event ordering (which is shuffled)
> in the connected streams is probably not what you are looking for.
>
> I think HybridSource is the right solution.
>
> Best regards,
> Alexey
>
> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user 
> wrote:
>
>> Hei, We want to bootstrap some data from a CSV file before reading from a
>> kafka topic that has a retention period of 7 days.
>>
>> We believe the best tool for that would be the HybridSource but the
>> problem we are facing is that both datasources are of different nature. The
>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3
>> fields.
>>
>> We could hack the kafkasource implementation and then in the
>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>> seems rather hackish. Is there a way more elegant to unify both datatypes
>> from both sources using Hybrid Source?
>>
>> thanks
>> Oscar
>>
>


Re: Difference between different values for starting offset

2023-07-04 Thread Oscar Perez via user
Hei,
Ok, thanks. so if I understand this correctly the difference between
OffsetInitializer.earliest and commitedOffset(OffsetResetStrategy.EARLIEST)
will be in the case that there is no flink state. In this case, earliest
will not check kafka committed offset and start from earliest while in the
latter will use the committed offset from kafka if there is any, is that
right? In either case if the committed offset is in flink state that will
take precedence and will be used in either case right?

Thanks,
Oscar

On Tue, 4 Jul 2023 at 02:56, Mason Chen  wrote:

> Hi Oscar,
>
> You are correct about the OffsetInitializer being only effective when
> there is no Flink state--in addition, if you have partition discovery on,
> this initializer will be reused for the new partitions (i.e. splits)
> discovered. Assuming the job is continuing from the offset in Flink state,
> there is no difference between the two strategies. This is because the
> `auto.offset.reset` maps to the `OffsetResetStrategy` and
> OffsetInitializer.earliest uses `earliest` too.
>
> Best,
> Mason
>
> On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user 
> wrote:
>
>> Hei,
>>
>> Looking at the flink documentation for kafkasource I see the following
>> values for starting offset:
>>
>> OffsetInitializer.earliest
>> OffsetInitializer.latest
>> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)
>>
>> From what I understand OffsetInitializer.earliest uses earliest offset
>> the first time but later deployments will use the committed offset in the
>> flink state to resume from there. If that is the case what is the
>> difference between OffsetInitializer.earliest and
>> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
>> committed offset after redeployment?
>>
>> Thanks!
>> Oscar
>>
>


Re: Using HybridSource

2023-07-04 Thread Oscar Perez via user
Hei,
1) We populate state based on this CSV data and do business logic based on
this state and events coming from other unrelated streams.
2) We are using low level process function in order to process this future
hybrid source

Regardless of the aforementioned points, please note that the main
challenge is to combine in a hybridsource CSV and kafka topic that return
different datatypes so I dont know how my answers relate to the original
problem tbh. Regards,
Oscar

On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov 
wrote:

> @Oscar
> 1. How do you plan to use that CSV data? Is it needed for lookup from the
> "main" stream?
> 2. Which API are you using? DataStream/SQL/Table or low level
> ProcessFunction?
>
> Best,
> Alex
>
>
> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user 
> wrote:
>
>> ok, but is it? As I said, both sources have different data types. In the
>> example here:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>
>> We are using both sources as returning string but in our case, one source
>> would return a protobuf event while the other would return a pojo. How can
>> we make the 2 sources share the same datatype so that we can successfully
>> use hybrid source?
>>
>> Regards,
>> Oscar
>>
>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov  wrote:
>>
>>> Hi Oscar,
>>>
>>> You could use connected streams and put your file into a special Kafka
>>> topic before starting such a job:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>> But this may require more work and the event ordering (which is
>>> shuffled) in the connected streams is probably not what you are looking for.
>>>
>>> I think HybridSource is the right solution.
>>>
>>> Best regards,
>>> Alexey
>>>
>>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hei, We want to bootstrap some data from a CSV file before reading from
>>>> a kafka topic that has a retention period of 7 days.
>>>>
>>>> We believe the best tool for that would be the HybridSource but the
>>>> problem we are facing is that both datasources are of different nature. The
>>>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3
>>>> fields.
>>>>
>>>> We could hack the kafkasource implementation and then in the
>>>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>>>> seems rather hackish. Is there a way more elegant to unify both datatypes
>>>> from both sources using Hybrid Source?
>>>>
>>>> thanks
>>>> Oscar
>>>>
>>>


Re: Using HybridSource

2023-07-05 Thread Oscar Perez via user
and this is our case Alexander, it is the same data schema but different
data format. Kafka comes from protobuf while the CSV is a POJO though both
have the same fields. IMHO, the design of HybridSource is very limited and
you have to do nasty workarounds if you want to combine from cold storage
(CSV, S3) and kafka if the expectations differ a bit from the most common
use case (e.g. using protobuf)

Regards,
Oscar

On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov 
wrote:

> I do not think that trying to "squash" two different data types into one
> just to use HybridSource is the right thing to do here. HybridSource is
> primarily intended for use cases that need to read the same data from
> different sources. A typical example: read events from "cold storage" in S3
> up to a specific point and switch over to "live" data in Kafka.
> Since you are already using the low-level API, you can either
> manually pull the data in inside of the open() function, or stream it into
> the local state using KeyedCoProcessFunction or
> KeyedBroadcastProcessFunction (depending on the size of the lookup state).
>
> This video should get you covered:
> https://www.youtube.com/watch?v=cJS18iKLUIY
>
> Best,
> Alex
>
>
> On Wed, 5 Jul 2023 at 07:29, Péter Váry 
> wrote:
>
>> Was it a conscious decision that HybridSource only accept Sources, and
>> does not allow mapping functions applied to them before combining them?
>>
>> On Tue, Jul 4, 2023, 23:53 Ken Krugler 
>> wrote:
>>
>>> Hi Oscar,
>>>
>>> Couldn’t you have both the Kafka and File sources return an Either>> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
>>> a MapFunction to convert to the unified/correct type?
>>>
>>> — Ken
>>>
>>>
>>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user 
>>> wrote:
>>>
>>> Hei,
>>> 1) We populate state based on this CSV data and do business logic based
>>> on this state and events coming from other unrelated streams.
>>> 2) We are using low level process function in order to process this
>>> future hybrid source
>>>
>>> Regardless of the aforementioned points, please note that the main
>>> challenge is to combine in a hybridsource CSV and kafka topic that return
>>> different datatypes so I dont know how my answers relate to the original
>>> problem tbh. Regards,
>>> Oscar
>>>
>>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
>>> alexander.fedu...@gmail.com> wrote:
>>>
>>>> @Oscar
>>>> 1. How do you plan to use that CSV data? Is it needed for lookup from
>>>> the "main" stream?
>>>> 2. Which API are you using? DataStream/SQL/Table or low level
>>>> ProcessFunction?
>>>>
>>>> Best,
>>>> Alex
>>>>
>>>>
>>>> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <
>>>> user@flink.apache.org> wrote:
>>>>
>>>>> ok, but is it? As I said, both sources have different data types. In
>>>>> the example here:
>>>>>
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>>>>
>>>>> We are using both sources as returning string but in our case, one
>>>>> source would return a protobuf event while the other would return a pojo.
>>>>> How can we make the 2 sources share the same datatype so that we can
>>>>> successfully use hybrid source?
>>>>>
>>>>> Regards,
>>>>> Oscar
>>>>>
>>>>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov 
>>>>> wrote:
>>>>>
>>>>>> Hi Oscar,
>>>>>>
>>>>>> You could use connected streams and put your file into a special
>>>>>> Kafka topic before starting such a job:
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>>>>> But this may require more work and the event ordering (which is
>>>>>> shuffled) in the connected streams is probably not what you are looking 
>>>>>> for.
>>>>>>
>>>>>> I think HybridSource is the right solution.
>>>>>>
>>>>>> Best regards,
>>>>>> Alexey
>>>>>>
>>>>>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>>>>>> user@flink.apache.org> wrote:
>>>>>>
>>>>>>> Hei, We want to bootstrap some data from a CSV file before reading
>>>>>>> from a kafka topic that has a retention period of 7 days.
>>>>>>>
>>>>>>> We believe the best tool for that would be the HybridSource but the
>>>>>>> problem we are facing is that both datasources are of different nature. 
>>>>>>> The
>>>>>>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3
>>>>>>> fields.
>>>>>>>
>>>>>>> We could hack the kafkasource implementation and then in the
>>>>>>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>>>>>>> seems rather hackish. Is there a way more elegant to unify both 
>>>>>>> datatypes
>>>>>>> from both sources using Hybrid Source?
>>>>>>>
>>>>>>> thanks
>>>>>>> Oscar
>>>>>>>
>>>>>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>


Dependency injection framework for flink

2023-08-01 Thread Oscar Perez via user
Hi,
we are currently migrating some of our jobs into hexagonal architecture and
I have seen that we can use spring as dependency injection framework, see:

https://getindata.com/blog/writing-flink-jobs-using-spring-dependency-injection-framework/

Has anybody analyzed different JVM DI frameworks e.g guice, micronaut, etc
and feasibility and performance on apache flink?

using google I have found some issues with dagger and flink while
guice/spring seems better suited but I could not find a study of
performance recommendations from the flink community.

Thanks!
Oscar


Access to collector in the process function

2023-08-30 Thread Oscar Perez via user
Hi!
We would like to use hexagonal architecture in our design and treat the
collector as an output port when sending events from the use case.

For that, we would like to call an interface from the use case that
effectively sends the event ultimately via out.collect

The problem is that for instantiating the use case we need to inject the
collector as dependency and we dont have access to the collector at the
process function class level, only at the processelement method level.

Is there any way to access the collector from the process function class,
in the open method ?

Regards,
Oscar


Problem when testing table API in local

2023-09-08 Thread Oscar Perez via user
Hei flink community,

We are facing an issue with flink 1.15, 1.16 or 1.16.2 (I tried these 3
versions with same results, maybe it is more general)

I am trying to test table API in local and for that I have the following
dependencies in my job. See the list of dependencies at the bottom of this
email.

When running the tests in local I get the following error:


org.apache.flink.table.api.TableException: Could not instantiate the
executor. Make sure a planner module is on the classpath
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factories that implement
'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.

But the planner module should be there as you can see in the following
dependencies. I have also checked that is there by looking at the runtime
classpath for tests.

Any help would be greatly appreciated. Thanks!

implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"

implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
implementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"

// Add test dependencies here.
testCompileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"


Re: Problem when testing table API in local

2023-09-08 Thread Oscar Perez via user
Hei,
Tried adding flink-cients like this:
still same error :(

implementation "org.apache.flink:flink-clients:${flinkVersion}"


On Fri, 8 Sept 2023 at 16:30, Alexey Novakov  wrote:

> Hi,
>
> You would need to add the flink-clients module when running in local mode.
> The *flink-clients* dependency is only necessary to invoke the Flink
> program locally (for example to run it standalone for testing and
> debugging).
>
> Best regards,
> Alexey
>
> On Fri, Sep 8, 2023 at 3:17 PM Oscar Perez via user 
> wrote:
>
>> Hei flink community,
>>
>> We are facing an issue with flink 1.15, 1.16 or 1.16.2 (I tried these 3
>> versions with same results, maybe it is more general)
>>
>> I am trying to test table API in local and for that I have the following
>> dependencies in my job. See the list of dependencies at the bottom of this
>> email.
>>
>> When running the tests in local I get the following error:
>>
>>
>> org.apache.flink.table.api.TableException: Could not instantiate the
>> executor. Make sure a planner module is on the classpath
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factories that implement
>> 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
>>
>> But the planner module should be there as you can see in the following
>> dependencies. I have also checked that is there by looking at the runtime
>> classpath for tests.
>>
>> Any help would be greatly appreciated. Thanks!
>>
>> implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
>> implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
>> implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
>>
>> implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
>> implementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
>>
>> // Add test dependencies here.
>> testCompileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
>> testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
>> testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
>> testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
>> testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
>> testImplementation 
>> "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
>>
>>


Re: Problem when testing table API in local

2023-09-08 Thread Oscar Perez via user
quick update,

after adding flink-clients it *worked *the first time I ran the test but
then the second time got the same error. Looks like a race condition or
transient error.?

Would love to get some hints on how to troubleshoot this one, if possible.
Thanks in advance!
Oscar

On Fri, 8 Sept 2023 at 16:38, Oscar Perez 
wrote:

> Hei,
> Tried adding flink-cients like this:
> still same error :(
>
> implementation "org.apache.flink:flink-clients:${flinkVersion}"
>
>
> On Fri, 8 Sept 2023 at 16:30, Alexey Novakov  wrote:
>
>> Hi,
>>
>> You would need to add the flink-clients module when running in local mode.
>> The *flink-clients* dependency is only necessary to invoke the Flink
>> program locally (for example to run it standalone for testing and
>> debugging).
>>
>> Best regards,
>> Alexey
>>
>> On Fri, Sep 8, 2023 at 3:17 PM Oscar Perez via user <
>> user@flink.apache.org> wrote:
>>
>>> Hei flink community,
>>>
>>> We are facing an issue with flink 1.15, 1.16 or 1.16.2 (I tried these 3
>>> versions with same results, maybe it is more general)
>>>
>>> I am trying to test table API in local and for that I have the following
>>> dependencies in my job. See the list of dependencies at the bottom of this
>>> email.
>>>
>>> When running the tests in local I get the following error:
>>>
>>>
>>> org.apache.flink.table.api.TableException: Could not instantiate the
>>> executor. Make sure a planner module is on the classpath
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not
>>> find any factories that implement
>>> 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
>>>
>>> But the planner module should be there as you can see in the following
>>> dependencies. I have also checked that is there by looking at the runtime
>>> classpath for tests.
>>>
>>> Any help would be greatly appreciated. Thanks!
>>>
>>> implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
>>> implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
>>> implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
>>>
>>> implementation 
>>> "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
>>> implementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
>>>
>>> // Add test dependencies here.
>>> testCompileOnly 
>>> "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
>>> testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
>>> testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
>>> testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
>>> testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
>>> testImplementation 
>>> "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
>>>
>>>


e2e tests with flink

2023-09-11 Thread Oscar Perez via user
Hi,

I have a flink job which I want to test e2e.

In the test I start flink minicluster and this reads from kafka topics in
testcontainers. I m facing a problem that for some topics I have starting
offset as latest and I want to publish these messages just after the job
has been completely started, so that these messages can be read

Is there a clean solution to send the payment event after the job has been
started? Currently I m using Thread.sleep for that but I would like to
await on something but dont know what would be the trigger for that

Regards,
Oscar


Profiling on flink jobs

2023-11-09 Thread Oscar Perez via user
hi [image: :wave:]  I am trying to do profiling on one of our flink jobs
according to these docs:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/We
are using OpenJDK 8.0. I am adding this line to the flink properties file
in docker-compose:

env.java.opts.all: "-XX:+UnlockCommercialFeatures
-XX:+UnlockDiagnosticVMOptions -XX:+FlightRecorder
-XX:+DebugNonSafepoints
-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/dump.jfr"

I would expect to see the dump.jfr file created once I cancel the job but
unfortunately I dont see anything created. How can I manage to get a valid
profile file? Thanks!
Regards,
Oscar


Doubts about state and table API

2023-11-24 Thread Oscar Perez via user
Hi,

We are having a job in production where we use table API to join multiple
topics. The query looks like this:


SELECT *
FROM topic1 AS t1
JOIN topic2 AS t2 ON t1.userId = t2.userId
JOIN topic3 AS t3 ON t1.userId = t3.accountUserId


This works and produces an EnrichedActivity any time any of the topics
receives a new event, which is what we expect. This SQL query is linked to
a processor function and the processElement gets triggered whenever a new
EnrichedActivity occurs

We have experienced an issue a couple of times in production where we have
deployed a new version from savepoint and then suddenly we
stopped receiving EnrichedActivities in the process function.

Our assumption is that this is related to the table API state and that some
operators are lost from going from one savepoint to new deployment.

Let me illustrate with one example:

version A of the job is deployed
version B of the job is deployed

version B UID for some table api operators changes and this operator is
removed when deploying version B as it is unable to be mapped (we have the
--allowNonRestoredState enabled)

The state for the table api stores bot the committed offset and the
contents of the topic but just the contents are lost and the committed
offset is still in the offset

Therefore, when doing the join of the query, it is not able to return any
row as it is unable to get data from topic2 or topic 3.

Can this be the case?
We are having a hard time trying to understand how the table api and state
works internally so any help in this regard would be truly helpful!

Thanks,
Oscar


Re: Operator ids

2023-11-25 Thread Oscar Perez via user
You, unfortunately, just cant AFAIK


On Sat, 25 Nov 2023 at 14:45, rania duni  wrote:

> Hello!
>
> I would like to know how can I get the operator ids of a running job. I
> know how can I get the task ids but I want the operator ids! I couldn’t
> find something to the REST API docs.
> Thank you.


Checkpoint RMM

2023-11-27 Thread Oscar Perez via user
Hi,

We have a long running job in production and we are trying to understand
the metrics for this job, see attached screenshot.

We have enabled incremental checkpoint for this job and we use RocksDB as a
state backend.

When deployed from fresh state, the initial checkpoint size is about* 2.41G*.
I guess most of the contents come from table API and reading a bunch of
topics from earliest.

Few data regarding the graph:

Full checkpoint size spans from* 2.41G *in September 21st until *11.6G* in
November 14th
The last checkpoint size (incremental checkpoint) goes from *232Mb* in
september 21st until  *2.35Gb *on November 14th. We take incremental
checkpoints every 30 seconds
Time that it takes to take a checkpoint goes from *1.66seconds *on
September 21st until *10.85 seconds* on November 14th

Few things we dont understand

Why incremental checkpoint size keeps increasing? My assumption would be
that deltas are lineal and the incremental checkpoint size would remain
around 230Mb but it keeps increasing over time until it reaches to 2.35Gb !

Full checkpoint size does not completely make sense. If each incremental
checkpoint size keeps increasing linearly, I would expect the full
checkpoint size to increase way way faster as the full checkpoint size is
the sum of all incremental checkpoints and we take an incremental
checkpoint every 30 seconds.

Time that takes to take a checkpoint correlates with the incremental
checkpoint sizes. The bigger the incremental checkpoint size, the longer it
takes to store it but we dont understand why this incremental checkpoint
keeps increasing. Is this something related to table API internals ?

Thanks for any help that could be given!
Regards,
Oscar


Metrics not available

2023-11-27 Thread Oscar Perez via user
Hi,

We are using flink 1.16 and we woud like to monitor the state metrics of a
certain job. Looking at the documentation I see that there are some state
access latencies:

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/metrics/

Namely, I would like to access the following:

*mapStateGetLatency*

How could I achieve this? I would expect to be accessed via

flink.task/operator.mapStateGetLatency but I dont see any related state
access latency metrics available. Needs to be enabled/configured in some
way?

Thanks,
Oscar


Advice on checkpoint interval best practices

2023-12-05 Thread Oscar Perez via user
Hei,

We are tuning some of the flink jobs we have in production and we would
like to know what are the best numbers/considerations for checkpoint
interval. We have set a default of 30 seconds for checkpoint interval and
the checkpoint operation takes around 2 seconds.
We have also enabled incremental checkpoint. I understand there is a
tradeoff between recovery from failure time vs performance degradation on
having an aggressive checkpoint policy but would like to know about what
you guys think it is a good compromise.

I read this article as reference:
https://shopify.engineering/optimizing-apache-flink-applications-tips

But what I would like is some formula or recipe in order to find out the
best value for checkpoint interval.

Regards,
Oscar


Feature flag functionality on flink

2023-12-07 Thread Oscar Perez via user
Hi,
We would like to enable sort of a feature flag functionality for flink jobs.

The idea would be to use broadcast state reading from a configuration topic
and then ALL operators with logic would listen to this state.

This documentation:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/broadcast_state/

explains how a certain operator can use this broadcast state but the
problem we are having is understanding how we can share the same state
across many different operators. One way is to create multiple streams, one
per operator reading from the same topic and then connect to the multiple
operators in a keyedbroadcastprocessfunction but this seems overkill

Is there an elegant solution to this problem?
regards,
Oscar


Sending key with the event

2024-01-23 Thread Oscar Perez via user
Hi flink experts!

I have a question regarding apache flink. We want to send an event to a
certain topic but for some reason we fail to send a proper key with the
event.

The event is published properly in the topic but the key for this event is
null.  I only see the method out.collect(event) to publish the event

What should we do in order to see the key in the published message?

Thanks and regards,
Oscar


How to list operators and see UID

2024-04-03 Thread Oscar Perez via user
Hei,

We are facing an issue with one of the jobs in production where fails to
map state from one deployment to another. I guess the problem is that we
failed to set a UID and relies on the default of providing one based on hash

Is it possible to see all operators / UIDs at a glance? What is the best
way to list all operators and associated UIDs? You guys print that
programmatically or is it accessible via flink UI?

What is the recommended way to do forensics? I.e. to take the UID and see
to what operator relates?

Thanks for the tips!
Oscar


Impact on using clean code and serializing everything

2024-04-04 Thread Oscar Perez via user
Hi,

We would like to adhere to clean code and expose all dependencies in the
constructor of the process functions

In flink, however, all dependencies passed to process functions must be
serializable. Another workaround is to instantiate these dependencies in
the open method of the process function and declare this dependency
transient

I wonder how, performance wise, would impact the performance of the job if
we declare all dependencies in the constructor and make them serializable.
Is this a wrong pattern to do? Has anybody run any experiment on
performance degradation of dependency exposed in the constructor vs
declaring it in the open method?

Thanks!
Oscar


Re: Flink job performance

2024-04-15 Thread Oscar Perez via user
Hi,
I appreciate your comments and thank you for that. My original question
still remains though. Why the very same job just by changing the settings
aforementioned had this increase in cpu usage and performance degradation
when we should have expected the opposite behaviour?

thanks again,
Oscar

On Mon, 15 Apr 2024 at 15:11, Zhanghao Chen 
wrote:

> The exception basically says the remote TM is unreachable, probably
> terminated due to some other reasons. This may not be the root cause. Is
> there any other exceptions in the log? Also, since the overall resource
> usage is almost full, could you try allocating more CPUs and see if the
> instability persists?
>
> Best,
> Zhanghao Chen
> --
> *From:* Oscar Perez 
> *Sent:* Monday, April 15, 2024 19:24
> *To:* Zhanghao Chen 
> *Cc:* Oscar Perez via user 
> *Subject:* Re: Flink job performance
>
> Hei, ok that is weird. Let me resend them.
>
> Regards,
> Oscar
>
> On Mon, 15 Apr 2024 at 14:00, Zhanghao Chen 
> wrote:
>
> Hi, there seems to be sth wrong with the two images attached in the latest
> email. I cannot open them.
>
> Best,
> Zhanghao Chen
> ------
> *From:* Oscar Perez via user 
> *Sent:* Monday, April 15, 2024 15:57
> *To:* Oscar Perez via user ; pi-team <
> pi-t...@n26.com>; Hermes Team 
> *Subject:* Flink job performance
>
> Hi community!
>
> We have an interesting problem with Flink after increasing parallelism in
> a certain way. Here is the summary:
>
> 1)  We identified that our job bottleneck were some Co-keyed process
> operators that were affecting on previous operators causing backpressure.
> 2( What we did was to increase the parallelism to all the operators from 6
> to 12 but keeping 6 these operators that read from kafka. The main reason
> was that all our topics have 6 partitions so increasing the parallelism
> will not yield better performance
>
> See attached job layout prior and after the changes:
> What happens was that some operations that were chained in the same
> operator like reading - filter - map - filter now are rebalanced and the
> overall performance of the job is suffering (keeps throwing exceptions now
> and then)
>
> Is the rebalance operation going over the network or this happens in the
> same node? How can we effectively improve performance of this job with the
> given resources?
>
> Thanks for the input!
> Regards,
> Oscar
>
>
>