Unsubscribe

2022-04-08 Thread Samir Vasani



Re: Flink Kafka Issue with EXACTLY_ONCE semantics

2022-04-08 Thread Frank Dekervel
Hello,

Check if your topic replication factor is not below min.isr setting of
Kafka. I had the same problem and that was it for me.

Frank

Op za 9 apr. 2022 04:01 schreef Praneeth Ramesh :

> Hi All
>
> I have a job which reads from kafka and applies some transactions and
> writes the data back to kafka topics.
> When I use the exactly once semantics I see that the kafka producer is not
> initialized and the operator task is hanging in INITIALIZING state and
> fails eventually.
>
> It works fine when I change the mode to AT_LEAST_ONCE. I see that in this
> mode the transactionId is null in the producer config and
> enable.idempotence is false.
>
> But when I enable EXACTLY_ONCE I see that the producer configs having
> transactionId which I set and enable.idempotence to true. From trace logs
> what I can see is the transaction manager is initializing and
> reinitializing the transactionId again and again and does not make any
> progress.
>
> I tried to analyze the thread dump and see that there are threads blocked
> at
>  at
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:68)
> at
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:632)
> at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.initTransactionId(FlinkKafkaInternalProducer.java:156)
>
>
> I made sure that there are no ACLs problems with brokers. Any idea what
> could be the problem?
>
> Attaching the TM logs where the application is reinitializing the
> transactions.
>
>
> Thanks for any help or pointers.
>
>
>
>
> --
> Regards
> Praneeth Ramesh
>


Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Here's an example of a WindowReaderFunction:

public class StateReaderFunction extends WindowReaderFunction {
private static final ListStateDescriptor LSD = new 
ListStateDescriptor<>(
"descriptorId",
Integer.class
);

@Override
public void readWindow(String s, Context context, 
Iterable elements, Collector out) throws Exception {
int count = 0;
for (Integer i : context.windowState().getListState(LSD).get()) {
count++;
}
out.collect(count);
}
}

That's for the operator that uses window state. The other readers do something 
similar but with context.globalState(). That should provide the number of state 
entries for each key+window combination, no? And after collecting all results, 
I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
context.windowState().getListState(...).clear().

Side note: in the state processor program I call 
ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(...)
>
> .collect()
>
> .parallelStream()
>
> .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: DataStream request / response

2022-04-08 Thread Jason Thomas
I will dig deeper into Statefun.  Also, yes for now I also can try the
Spring/Kafka solution if Statefun doesn't fit.

Austin - as far rewriting our microservices in Flink here are some things I
was looking for:

- We need to be able to easily share/transform data with other teams.
Flink SQL seems really nice for this.  We also have use cases for real-time
analytics within our own application.
- If a Flink job is down temporarily due to redeployment, it can just pick
up where it left off.  With microservices, data gets lost/corrupted.
- I'm trying to help improve developer productivity, have better auditing
and logging, improve testing, etc.  An event driven architecture obviously
isn't required to have these things, but it should help.
- My intuition is that Flink will have lower hosting costs, but I haven't
tested this yet.

Thanks everyone for the help, I really appreciate it!

-Jason

On Fri, Apr 8, 2022 at 2:34 PM Roman Khachatryan  wrote:

> It seems to be possible now with RequestReplyHandlers from Java SDK
> [1] (or other SDKs) unless I'm missing something.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/java/#serving-functions
>
> Regards,
> Roman
>
> On Fri, Apr 8, 2022 at 7:45 PM Austin Cawley-Edwards
>  wrote:
> >
> > Good suggestion – though a common misconception with Statefun is that
> HTTP ingestion is possible. Last time I checked it was still under
> theoretical discussion. Do you know the current  state there?
> >
> > Austin
> >
> > On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Besides the solution suggested by Austing, you might also want to look
> >> at Stateful Functions [1]. They provide a more convenient programming
> >> model for the use-case I think, while DataStream is a relatively
> >> low-level API.
> >>
> >> [1]
> >> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
> >>
> >> Regards,
> >> Roman
> >>
> >> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
> >>  wrote:
> >> >
> >> > Hi Jason,
> >> >
> >> > No, there is no HTTP source/ sink support that I know of for Flink.
> Would running the Spring + Kafka solution in front of Flink work for you?
> >> >
> >> > On a higher level, what drew you to migrating the microservice to
> Flink?
> >> >
> >> > Best,
> >> > Austin
> >> >
> >> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas <
> katsoftware...@gmail.com> wrote:
> >> >>
> >> >> I'm taking an existing REST based microservice application and
> moving all of the logic into Flink DataStreams.
> >> >>
> >> >> Is there an easy way to get a request/response from a Flink
> DataStream so I can 'call' into it from a REST service?   For example,
> something similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
> >> >>
> >> >> Thanks for any help!
> >> >>
> >> >> -Jason
> >> >>
>


Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Roman Khachatryan
Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(...)
>
> .collect()
>
> .parallelStream()
>
> .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Roman Khachatryan
I'd try to increase the value, so that the timeout doesn't happen
during the shutdown.

Regards,
Roman

On Fri, Apr 8, 2022 at 7:50 PM Alexey Trenikhun  wrote:
>
> Hi Roman,
> Currently rest.async.store-duration is not set. Are you suggesting to try to 
> decrease value from default or vice-versa?
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Friday, April 8, 2022 5:32:45 AM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List 
> Subject: Re: Error during shutdown of StandaloneApplicationClusterEntryPoint 
> via JVM shutdown hook
>
> Hello,
>
> Unfortunately, it's difficult to name the exact reason why the timeout
> happens because there's no message logged.
> I've opened a ticket to improve the logging [1].
> There, I also listed some code paths that might lead to this situation.
>
> From the described scenario, I'd suppose that it's
> CompletedOperationCache.closeAsync()  that times out. It can be
> verified or maybe mitigated by changing rest.async.store-duration  [2]
> (the default is 5 minutes).
> Could you check that?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-27144
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration
>
> Regards,
> Roman
>
> On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
> >
> > Hello,
> >
> > We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> > POST, 
> > url=http://gsp-jm:8081/jobs//savepoints, 
> > then we wait for up to 5 minutes for completion, periodically pulling 
> > status (GET, 
> > url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
> >  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> > url=http://gsp-jm:8081/jobs/000). Usually it 
> > works well, job stopped one way or another and we proceed with upgrade, but 
> > currently JM exits with code -2, and as result k8s restarts pod. We tried 
> > multiple times, but every time getting -2. JM log is below (newest messages 
> > on top):
> >
> > 2022-04-06T14:21:17.465Z Error during shutdown of 
> > StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> > java.util.concurrent.CompletionException: 
> > java.util.concurrent.TimeoutException
> >  at 
> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >  at 
> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >  at 
> > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
> >  at 
> > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> >  at 
> > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> >  at 
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
> >  at 
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
> >  at 
> > org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
> >  at java.lang.Thread.run(Thread.java:750) Caused by: 
> > java.util.concurrent.TimeoutException: null
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >  at 
> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >  at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >  at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >  ... 1 common frames omitted
> > 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> > StandaloneApplicationClusterEntryPoint with exit code 2.
> > java.util.concurrent.TimeoutException: null
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >  at 
> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sch

Re: DataStream request / response

2022-04-08 Thread Roman Khachatryan
It seems to be possible now with RequestReplyHandlers from Java SDK
[1] (or other SDKs) unless I'm missing something.

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/java/#serving-functions

Regards,
Roman

On Fri, Apr 8, 2022 at 7:45 PM Austin Cawley-Edwards
 wrote:
>
> Good suggestion – though a common misconception with Statefun is that HTTP 
> ingestion is possible. Last time I checked it was still under theoretical 
> discussion. Do you know the current  state there?
>
> Austin
>
> On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Besides the solution suggested by Austing, you might also want to look
>> at Stateful Functions [1]. They provide a more convenient programming
>> model for the use-case I think, while DataStream is a relatively
>> low-level API.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
>>  wrote:
>> >
>> > Hi Jason,
>> >
>> > No, there is no HTTP source/ sink support that I know of for Flink. Would 
>> > running the Spring + Kafka solution in front of Flink work for you?
>> >
>> > On a higher level, what drew you to migrating the microservice to Flink?
>> >
>> > Best,
>> > Austin
>> >
>> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas  
>> > wrote:
>> >>
>> >> I'm taking an existing REST based microservice application and moving all 
>> >> of the logic into Flink DataStreams.
>> >>
>> >> Is there an easy way to get a request/response from a Flink DataStream so 
>> >> I can 'call' into it from a REST service?   For example, something 
>> >> similar to this Kafka streams example that uses Spring 
>> >> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
>> >>
>> >> Thanks for any help!
>> >>
>> >> -Jason
>> >>


Re: HDFS streaming source concerns

2022-04-08 Thread Roman Khachatryan
Hi Carlos,

AFAIK, Flink FileSource is capable of checkpointing while reading the
files (at least in Streaming Mode).
As for the watermarks, I think FLIP-182 [1] could solve the problem;
however, it's currently under development.

I'm also pulling in Arvid and Fabian who are more familiar with the subject.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

Regards,
Roman

On Wed, Apr 6, 2022 at 4:17 PM Carlos Downey  wrote:
>
> Hi,
>
> We have an in-house platform that we want to integrate with external clients 
> via HDFS. They have lots of existing files and they continuously put more 
> data to HDFS. Ideally, we would like to have a Flink job that takes care of 
> ingesting data as one of the requirements is to execute SQL on top of these 
> files. We looked at existing FileSource implementation but we believe this 
> will not be well suited for this use case.
> - firstly, we'd have to ingest all files initially present on HDFS before 
> completing first checkpoint - this is unacceptable for us as we would have to 
> reprocess all the files again in case of early job failure. Not to mention 
> the state blowing up for aggregations.
> - secondly, we see now way to establish valid watermark strategy. This is a 
> major pain point that we can't find the right answer for. We don't want to 
> assume too much about the data itself. In general, the only solutions we see 
> require some sort of synchronization across subtasks. On the other hand, the 
> simplest strategy is to delay the watermark. In that case though we are 
> afraid of accidentally dropping events.
>
> Given this, we think about implementing our own file source, have someone in 
> the community already tried solving similar problem? If not, any suggestions 
> about the concerns we raised would be valuable.


Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Alexey Trenikhun
Hi Roman,
Currently rest.async.store-duration is not set. Are you suggesting to try to 
decrease value from default or vice-versa?

Thanks,
Alexey

From: Roman Khachatryan 
Sent: Friday, April 8, 2022 5:32:45 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Error during shutdown of StandaloneApplicationClusterEntryPoint 
via JVM shutdown hook

Hello,

Unfortunately, it's difficult to name the exact reason why the timeout
happens because there's no message logged.
I've opened a ticket to improve the logging [1].
There, I also listed some code paths that might lead to this situation.

>From the described scenario, I'd suppose that it's
CompletedOperationCache.closeAsync()  that times out. It can be
verified or maybe mitigated by changing rest.async.store-duration  [2]
(the default is 5 minutes).
Could you check that?

[1]
https://issues.apache.org/jira/browse/FLINK-27144
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration

Regards,
Roman

On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
>
> Hello,
>
> We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> POST, 
> url=http://gsp-jm:8081/jobs//savepoints, then 
> we wait for up to 5 minutes for completion, periodically pulling status (GET, 
> url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
>  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> url=http://gsp-jm:8081/jobs/000). Usually it 
> works well, job stopped one way or another and we proceed with upgrade, but 
> currently JM exits with code -2, and as result k8s restarts pod. We tried 
> multiple times, but every time getting -2. JM log is below (newest messages 
> on top):
>
> 2022-04-06T14:21:17.465Z Error during shutdown of 
> StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
>  at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>  at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
>  at 
> org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
>  at java.lang.Thread.run(Thread.java:750) Caused by: 
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 common frames omitted
> 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> StandaloneApplicationClusterEntryPoint with exit code 2.
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:750)
> 2022-04-06T14:21:17.463Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.46Z Stopped Akka RPC service.
> 2022

Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Good suggestion – though a common misconception with Statefun is that HTTP
ingestion is possible. Last time I checked it was still under theoretical
discussion. Do you know the current  state there?

Austin

On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan  wrote:

> Hi,
>
> Besides the solution suggested by Austing, you might also want to look
> at Stateful Functions [1]. They provide a more convenient programming
> model for the use-case I think, while DataStream is a relatively
> low-level API.
>
> [1]
> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
>
> Regards,
> Roman
>
> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
>  wrote:
> >
> > Hi Jason,
> >
> > No, there is no HTTP source/ sink support that I know of for Flink.
> Would running the Spring + Kafka solution in front of Flink work for you?
> >
> > On a higher level, what drew you to migrating the microservice to Flink?
> >
> > Best,
> > Austin
> >
> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
> wrote:
> >>
> >> I'm taking an existing REST based microservice application and moving
> all of the logic into Flink DataStreams.
> >>
> >> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
> >>
> >> Thanks for any help!
> >>
> >> -Jason
> >>
>


Unsubscribe

2022-04-08 Thread Natalie Dunn



Re: DataStream request / response

2022-04-08 Thread Roman Khachatryan
Hi,

Besides the solution suggested by Austing, you might also want to look
at Stateful Functions [1]. They provide a more convenient programming
model for the use-case I think, while DataStream is a relatively
low-level API.

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-stable/

Regards,
Roman

On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
 wrote:
>
> Hi Jason,
>
> No, there is no HTTP source/ sink support that I know of for Flink. Would 
> running the Spring + Kafka solution in front of Flink work for you?
>
> On a higher level, what drew you to migrating the microservice to Flink?
>
> Best,
> Austin
>
> On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas  wrote:
>>
>> I'm taking an existing REST based microservice application and moving all of 
>> the logic into Flink DataStreams.
>>
>> Is there an easy way to get a request/response from a Flink DataStream so I 
>> can 'call' into it from a REST service?   For example, something similar to 
>> this Kafka streams example that uses Spring ReplyingKafkaTemplate - 
>> https://stackoverflow.com/a/58202587.
>>
>> Thanks for any help!
>>
>> -Jason
>>


Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Hi Jason,

No, there is no HTTP source/ sink support that I know of for Flink. Would
running the Spring + Kafka solution in front of Flink work for you?

On a higher level, what drew you to migrating the microservice to Flink?

Best,
Austin

On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
wrote:

> I'm taking an existing REST based microservice application and moving all
> of the logic into Flink DataStreams.
>
> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
>
> Thanks for any help!
>
> -Jason
>
>


DataStream request / response

2022-04-08 Thread Jason Thomas
I'm taking an existing REST based microservice application and moving all
of the logic into Flink DataStreams.

Is there an easy way to get a request/response from a Flink DataStream so I
can 'call' into it from a REST service?   For example, something similar to
this Kafka streams example that uses Spring ReplyingKafkaTemplate -
https://stackoverflow.com/a/58202587.

Thanks for any help!

-Jason


RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hello,

I have a streaming job running on Flink 1.14.4 that uses managed state with 
RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
environment that has been running for the last week and I noticed that state 
size and end-to-end duration have been increasing steadily. Currently, duration 
is 11 seconds and size is 917MB (as shown in the UI). The tasks with the 
largest state (614MB) come from keyed sliding windows. Some attributes of this 
job's setup:


  *   Windows are 11 minutes in size.
  *   Slide time is 1 minute.
  *   Throughput is approximately 20 events per minute.

I have 3 operators with these states:


  1.  Window state with ListState and no TTL.
  2.  Global window state with MapState> and a TTL of 1 hour 
(with cleanupInRocksdbCompactFilter(1000L)).
  3.  Global window state with ListState where the Pojo has an int and a 
long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) 
as well.

Both operators with global window state have logic to manually remove old state 
in addition to configured TTL. The other operator does override and call 
clear().

I have now analyzed the checkpoint folder with the state processor API, and 
I'll note here that I see 50 folders named chk-*** even though I don't set 
state.checkpoints.num-retained and the default should be 1. I loaded the data 
from the folder with the highest chk number and I see that my operators have 
these amounts respectively:


  1.  10 entries
  2.  80 entries
  3.  200 entries

I got those numbers with something like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(...)
.collect()
.parallelStream()
.reduce(0, Integer::sum);

Where my WindowReaderFunction classes just count the number of entries in each 
call to readWindow.

Those amounts cannot possibly account for 614MB, so what am I missing?

Regards,
Alexis.



Re: RocksDB state not cleaned up

2022-04-08 Thread Yun Tang
Hi Alexis,

RocksDB itself supports manual compaction API [1], and current Flink does not 
support to call these APIs to support periodic compactions.

If Flink supports such period compaction, from my understanding, this is 
somehow like major compaction in HBase. I am not sure whether this is really 
useful for Flink as this could push data to the last level, which leads to 
increase the read amplification.

[1] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/RocksDB.html

Best
Yun Tang

From: Alexis Sarda-Espinosa 
Sent: Friday, April 8, 2022 18:54
To: tao xiao ; David Morávek 
Cc: Yun Tang ; user 
Subject: RE: RocksDB state not cleaned up


May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.



Regards,

Alexis.



From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up



Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.



Hi @Yun Tang do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?



On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:

Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.



Timers:

- No "side effects".

- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.



TTL:

- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.

- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)



With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.



I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.



[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction



Best,

D.



On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?



On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?



I will turn on RocksDB logging as well as compaction logging [1] to verify this



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction





On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:

Hi Tao,



my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.



Some reasoning from Stephan:



It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.



This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.



You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?



[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting



Best,

D.



On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi team



We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never update it again after the state is not empty. The 
key of the value state is timestamp. My understanding of such TTL settings is 
that the size of a

Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Roman Khachatryan
Hello,

Unfortunately, it's difficult to name the exact reason why the timeout
happens because there's no message logged.
I've opened a ticket to improve the logging [1].
There, I also listed some code paths that might lead to this situation.

>From the described scenario, I'd suppose that it's
CompletedOperationCache.closeAsync()  that times out. It can be
verified or maybe mitigated by changing rest.async.store-duration  [2]
(the default is 5 minutes).
Could you check that?

[1]
https://issues.apache.org/jira/browse/FLINK-27144
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration

Regards,
Roman

On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
>
> Hello,
>
> We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> POST, 
> url=http://gsp-jm:8081/jobs//savepoints, then 
> we wait for up to 5 minutes for completion, periodically pulling status (GET, 
> url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
>  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> url=http://gsp-jm:8081/jobs/000). Usually it 
> works well, job stopped one way or another and we proceed with upgrade, but 
> currently JM exits with code -2, and as result k8s restarts pod. We tried 
> multiple times, but every time getting -2. JM log is below (newest messages 
> on top):
>
> 2022-04-06T14:21:17.465Z Error during shutdown of 
> StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
>  at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>  at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
>  at 
> org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
>  at java.lang.Thread.run(Thread.java:750) Caused by: 
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 common frames omitted
> 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> StandaloneApplicationClusterEntryPoint with exit code 2.
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:750)
> 2022-04-06T14:21:17.463Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.46Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.373Z Remoting shut down.
> 2022-04-06T14:21:17.373Z Remoting shut down.
> 2022-04-06T14:21:17.297Z Remote daemon shut down; proceeding with flushing 
> remote transports.
> 2022-04-06T14:21:17.297Z Remote daemon shut down; proceeding with flushing 
> remote transports.
> 2022-04-06T14:21:17.296Z Shutting down remote daemon.
> 2022-04-06T14:21:17.296Z Shutting down remote dae

RE: RocksDB state not cleaned up

2022-04-08 Thread Alexis Sarda-Espinosa
May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.

Regards,
Alexis.

From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up

Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:
Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.

[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify this

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.

This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.

You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never update it again after the state is not empty. The 
key of the value state is timestamp. My understanding of such TTL settings is 
that the size of all SST files remains flat (let's disregard the impact space 
amplification brings) after 1 day as the daily data volume is more or less the 
same. However the RocksDB native metrics show that the SST files continue to 
grow since I started the job. I check the SST files in local storage and I can 
see SST files with age 1 months ago (when I started the job). What is the 
possible reason for the SST files not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
StateTtlConfig ttlConfigClick = StateTtlConfig
.newBuilder(Time.days(1))
   

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Martijn Visser
Hi Jin,

Flink is an open source project, so the community works on best-effort.
There's no guaranteed/quick support available. There are companies that
provide commercial support if needed.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 8 Apr 2022 at 12:13, Jin Yi  wrote:

> confirmed that moving back to FlinkKafkaConsumer fixes things.
>
> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
>
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:
>
>> based on symptoms/observations on the first operator (LogRequestFilter)
>> watermark and event timestamps, it does seem like it's the bug.  things
>> track fine (timestamp > watermark) for the first batch of events, then the
>> event timestamps go back into the past and are "late".
>>
>> looks like the 1.14 backport just got in 11 days ago (
>> https://github.com/apache/flink/pull/19128).  is there a way to easily
>> test this fix locally?  based on the threads, should i just move back to
>> FlinkKafkaConsumer until 1.14.5?
>>
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
>>
>>> Hi Jin,
>>>
>>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>>> watermark (or per-split watermark) is a default feature integrated in
>>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>>> happens during the first fetch of the source that records in the first
>>> split pushes the watermark far away, then records from other splits will be
>>> treated as late events.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>>
>>> Best regards,
>>>
>>> Qingsheng
>>>
>>>
>>> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
>>> >
>>> > how should the code look like to verify we're using per-partition
>>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>>> 1.14.4?
>>> >
>>> > we currently have it looking like:
>>> >
>>> > streamExecutionEnvironment.fromSource(
>>> >KafkaSource.builder().build(),
>>> >watermarkStrategy,
>>> >"whatever",
>>> >typeInfo);
>>> >
>>> > when running this job with the streamExecutionEnviornment parallelism
>>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>>> behaviors where the first operator after this source consumes events out of
>>> order (and therefore, violates watermarks).  the operator simply checks to
>>> see what "type" of event something is and uses side outputs to output the
>>> type-specific messages.  here's a snippet of the event timestamp going back
>>> before the current watermark (first instance of going backwards in time in
>>> bold):
>>> >
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,317 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> >
>>> >
>>> >
>>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
>>> wrote:
>>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>>> Thank you for the help!
>>> >
>>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>>> wrote:
>>> > Thanks, Thias and Dongwon.
>>> >
>>>

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
confirmed that moving back to FlinkKafkaConsumer fixes things.

is there some notification channel/medium that highlights critical
bugs/issues on the intended features like this pretty readily?

On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:

> based on symptoms/observations on the first operator (LogRequestFilter)
> watermark and event timestamps, it does seem like it's the bug.  things
> track fine (timestamp > watermark) for the first batch of events, then the
> event timestamps go back into the past and are "late".
>
> looks like the 1.14 backport just got in 11 days ago (
> https://github.com/apache/flink/pull/19128).  is there a way to easily
> test this fix locally?  based on the threads, should i just move back to
> FlinkKafkaConsumer until 1.14.5?
>
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
>
>> Hi Jin,
>>
>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>> watermark (or per-split watermark) is a default feature integrated in
>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>> happens during the first fetch of the source that records in the first
>> split pushes the watermark far away, then records from other splits will be
>> treated as late events.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>
>> Best regards,
>>
>> Qingsheng
>>
>>
>> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
>> >
>> > how should the code look like to verify we're using per-partition
>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>> 1.14.4?
>> >
>> > we currently have it looking like:
>> >
>> > streamExecutionEnvironment.fromSource(
>> >KafkaSource.builder().build(),
>> >watermarkStrategy,
>> >"whatever",
>> >typeInfo);
>> >
>> > when running this job with the streamExecutionEnviornment parallelism
>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>> behaviors where the first operator after this source consumes events out of
>> order (and therefore, violates watermarks).  the operator simply checks to
>> see what "type" of event something is and uses side outputs to output the
>> type-specific messages.  here's a snippet of the event timestamp going back
>> before the current watermark (first instance of going backwards in time in
>> bold):
>> >
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>> > 2022-04-08 05:47:06,317 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>> >
>> >
>> >
>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
>> wrote:
>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>> Thank you for the help!
>> >
>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>> wrote:
>> > Thanks, Thias and Dongwon.
>> >
>> > I'll keep debugging this with the idle watermark turned off.
>> >
>> > Next TODOs:
>> > - Verify that we’re using per-partition watermarks.  Our code matches
>> the example but maybe something is disabling it.
>> > - Enable logging of partition-consumer assignment, to see if that is
>> the cause of the problem.
>> > - Look at adding flags to set the source parallelism to see if that
>> fixes the issue.
>> >
>> > Yes, I've seen Flink talks 

RE: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure 
made me realize I should have debugged the code and looked at the types. 
Apparently setting TTL changes the serializer, so I also had to add TTL in the 
WindowReaderFunction.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider 
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers 
(constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa 
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Roman Khachatryan
Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink
consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing
serializers (constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
based on symptoms/observations on the first operator (LogRequestFilter)
watermark and event timestamps, it does seem like it's the bug.  things
track fine (timestamp > watermark) for the first batch of events, then the
event timestamps go back into the past and are "late".

looks like the 1.14 backport just got in 11 days ago (
https://github.com/apache/flink/pull/19128).  is there a way to easily test
this fix locally?  based on the threads, should i just move back to
FlinkKafkaConsumer until 1.14.5?

On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:

> Hi Jin,
>
> If you are using new FLIP-27 sources like KafkaSource, per-partition
> watermark (or per-split watermark) is a default feature integrated in
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
> happens during the first fetch of the source that records in the first
> split pushes the watermark far away, then records from other splits will be
> treated as late events.
>
> [1] https://issues.apache.org/jira/browse/FLINK-26018
>
> Best regards,
>
> Qingsheng
>
>
> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> >
> > how should the code look like to verify we're using per-partition
> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
> 1.14.4?
> >
> > we currently have it looking like:
> >
> > streamExecutionEnvironment.fromSource(
> >KafkaSource.builder().build(),
> >watermarkStrategy,
> >"whatever",
> >typeInfo);
> >
> > when running this job with the streamExecutionEnviornment parallelism
> set to 1, and the kafka source having 30 partitions, i'm seeing weird
> behaviors where the first operator after this source consumes events out of
> order (and therefore, violates watermarks).  the operator simply checks to
> see what "type" of event something is and uses side outputs to output the
> type-specific messages.  here's a snippet of the event timestamp going back
> before the current watermark (first instance of going backwards in time in
> bold):
> >
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >
> >
> >
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank
> you for the help!
> >
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> > Thanks, Thias and Dongwon.
> >
> > I'll keep debugging this with the idle watermark turned off.
> >
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the
> cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >
> > Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >
> > Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >
> > Best,
> >
> > D

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Qingsheng Ren
Hi Jin,

If you are using new FLIP-27 sources like KafkaSource, per-partition watermark 
(or per-split watermark) is a default feature integrated in SourceOperator. You 
might hit the bug described in FLINK-26018 [1], which happens during the first 
fetch of the source that records in the first split pushes the watermark far 
away, then records from other splits will be treated as late events.  

[1] https://issues.apache.org/jira/browse/FLINK-26018

Best regards,

Qingsheng


> On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> 
> how should the code look like to verify we're using per-partition watermarks 
> if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> 
> we currently have it looking like:
> 
> streamExecutionEnvironment.fromSource(
>KafkaSource.builder().build(),
>watermarkStrategy,
>"whatever",
>typeInfo);
> 
> when running this job with the streamExecutionEnviornment parallelism set to 
> 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> where the first operator after this source consumes events out of order (and 
> therefore, violates watermarks).  the operator simply checks to see what 
> "type" of event something is and uses side outputs to output the 
> type-specific messages.  here's a snippet of the event timestamp going back 
> before the current watermark (first instance of going backwards in time in 
> bold):
> 
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> 2022-04-08 05:47:06,317 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> 
> 
> 
> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you 
> for the help!
> 
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> Thanks, Thias and Dongwon.
> 
> I'll keep debugging this with the idle watermark turned off.
> 
> Next TODOs:
> - Verify that we’re using per-partition watermarks.  Our code matches the 
> example but maybe something is disabling it.
> - Enable logging of partition-consumer assignment, to see if that is the 
> cause of the problem.
> - Look at adding flags to set the source parallelism to see if that fixes the 
> issue.
> 
> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> Sounds like a good idea.
> 
> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim  wrote:
> I totally agree with Schwalbe that per-partition watermarking allows # source 
> tasks < # kafka partitions. 
> 
> Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> said.
> 
> Best,
> 
> Dongwon
> 
> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
>  wrote:
> Hi San, Dongwon,
> 
>  
> 
> I share the opinion that when per-partition watermarking is enabled, you 
> should observe correct behavior … would be interesting to see why it does not 
> work for you.
> 
>  
> 
> I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when 
> processing timers (much simplifie

Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi everyone,

I have a ProcessWindowFunction that uses Global window state. It uses MapState 
with a descriptor defined like this:

MapStateDescriptor> msd = new MapStateDescriptor<>(
"descriptorName",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint>() {})
);

Now I'm trying to access a checkpoint's state data to read that (created with 
RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction that defines the same descriptor and calls this in 
readWindow:

MapState> mapState = context.globalState().getMapState(msd);

After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
configure the reader function like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"my-uid",
new StateReaderFunction(),
Types.STRING,
TypeInformation.of(MyPojo.class),
Types.INT
)
.print();

But I am getting this exception:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).

Does someone know what I'm doing wrong in my setup?

Regards,
Alexis.