Re: Issues with Flink scheduler?

2022-08-01 Thread Hemanga Borah
We are using 1.14 version currently. The final manifestation of the issue
shows up as the trace I pasted above, and then the job keeps on restarting.
When we track back, we see various exceptions depending on the job, for
example for one of the jobs, some tasks were failing due to out-of-memory
exceptions. We resolve the issue by deleting all the task manager pods from
the Kubernetes cluster. As soon as we delete all task managers, new pods
are created and the job starts up normally. I feel the reason behind this
is that the scheduler tries to start up the new job very aggressively, and
so it is not able to find enough resources.

On Sun, Jul 31, 2022 at 6:59 PM Lijie Wang  wrote:

> Hi,
>
> Which version are you using? Has any job failover occurred? It would be
> better if you can provide the full log of JM.
>
> Best,
> Lijie
>
> Hemanga Borah  于2022年8月1日周一 01:47写道:
>
>> Hello guys,
>>  We have been seeing an issue with our Flink applications. Our
>> applications run fine for several hours, and then we see an error/exception
>> like so:
>>
>> java.util.concurrent.CompletionException: 
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not acquire the minimum required resources.
>>
>> For some applications, this error/exception appears once, which stays in
>> history for a while and but the job recovers. However, for some
>> applications, we see this error thrown repeatedly, and the application gets
>> into a crash loop.
>>
>> Since our application had been running fine for several hours before we
>> see such a message, our suspicion is that when the crash happens, the job
>> manager aggressively tries to start back the job, and is not able to
>> acquire enough resources because the previous job has not cleaned up as yet.
>>
>> Has anyone else been seeing this issue? If so, what did you guys try to
>> fix it?
>>
>> Thanks,
>> HKB
>>
>>


Issues with Flink scheduler?

2022-07-31 Thread Hemanga Borah
Hello guys,
 We have been seeing an issue with our Flink applications. Our applications
run fine for several hours, and then we see an error/exception like so:

java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could
not acquire the minimum required resources.

For some applications, this error/exception appears once, which stays in
history for a while and but the job recovers. However, for some
applications, we see this error thrown repeatedly, and the application gets
into a crash loop.

Since our application had been running fine for several hours before we see
such a message, our suspicion is that when the crash happens, the job
manager aggressively tries to start back the job, and is not able to
acquire enough resources because the previous job has not cleaned up as yet.

Has anyone else been seeing this issue? If so, what did you guys try to fix
it?

Thanks,
HKB


Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Hemanga Borah
Here is the documentation of the Tuple class:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/tuple/Tuple.html

If you need a concrete class, you can go from Tuple0 to Tuple25.

On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang  wrote:

> I didn't copy the exact error message, but basically the idea of the error
> message is that I cannot use the abstract class Tuple and instead, I should
> use Tuple1, Tuple2 and etc.
>
> Thomas
>
> On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah 
> wrote:
>
>> What error do you see?
>>
>> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:
>>
>>> Hi,
>>>
>>> I have a use case where I need to call DataStream.keyBy() with keys
>>> loaded from a configuration. The number of keys and their data types are
>>> variables and is determined by the configuration. Once the configuration is
>>> loaded, they won't change. I'm trying to use the following key selector,
>>> but it looks like I cannot use Tuple as the key type here. Is there any way
>>> I can work around this as the rest of the logic of my application is the
>>> same. Thank you!
>>>
>>> public class SimpleRecordKeySelector implements
>>> KeySelector
>>>
>>> Thomas
>>>
>>>


Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Hemanga Borah
What error do you see?

On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:

> Hi,
>
> I have a use case where I need to call DataStream.keyBy() with keys loaded
> from a configuration. The number of keys and their data types are variables
> and is determined by the configuration. Once the configuration is loaded,
> they won't change. I'm trying to use the following key selector, but it
> looks like I cannot use Tuple as the key type here. Is there any way I can
> work around this as the rest of the logic of my application is the same.
> Thank you!
>
> public class SimpleRecordKeySelector implements
> KeySelector
>
> Thomas
>
>


Re: Unit test have Error "could not find implicit value for evidence parameter"

2022-07-09 Thread Hemanga Borah
What version of scala are you using?

Depending on the build you are using you may be using 2.11 or 2.12. The
version on your maven build files needs to be consistent across your system.

On Fri, Jul 8, 2022 at 10:00 PM Min Tu via user 
wrote:

> Hi,
>
> I have downloaded the flink code and the build works fine with following
> command
>
> mvnw install -DskipTests -Dcheckstyle.skip
>
> Then I try to run the unit test code in IntelliJ, but got following error:
>
>
> /Users/mintu/ApacheProjects/flink/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala:34:41
> *could not find implicit value for evidence parameter of type*
> org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
> val solutionInput = env.fromElements((1, "1"))
>
> Please advise.
>
> Thanks in advance
>


Re: Migrating Flink apps across cloud with state

2022-05-11 Thread Hemanga Borah
Thank you for the suggestions, guys!

@Andrew Otto
This is the way we will most likely go. However, this will require us to
meddle with the Flink consumer codebase. And looks like there is no other
way around it. We will add some custom code to perform offset resetting for
specific savepoints.

@Konstantin Knauf
This is a valuable suggestion. And we thought about it. However, we are
migrating 25+ applications, and making such code changes for each of them
would be quite expensive because we have to write code for each of these
apps. So, even though this method would definitely get us to the goal, we
cannot afford to adopt it at this time due to a lot of manual changes.


On Tue, May 10, 2022 at 11:59 AM Konstantin Knauf  wrote:

> Hi there,
>
> to me the simplest and most reliable solution still seems to be to split
> the stream based on event time. It requires a bit of preparation and assume
> that you can tolerate some downtime when doing the migration.
>
> 1) For Cloud1 you chain a filter to your sources that filters out any
> records with a timestamp >  t_migration. Best you make this timestamp
> configurable.
> 2) For Cloud2, you chain a filter to your sources that filters out any
> records with timestamp <= t_migration.  Also configurable.
> 3) When you do the migration you configure t_migration to be, let's say 1
> hour in the future. You let the Job run in Cloud1 until you are sure that
> no more data with an event timestamp <= t_migration will arrive. You take a
> savepoint.
> 4) You start your application in cloud2 with the same value for
> t_migration and manually configured Kafka offsets for which you are sure
> they contain all records with a timestamp > t_migration.
>
> Could this work for you?
>
> Cheers,
>
> Konstantin
>
>
>
>
> Am Mi., 4. Mai 2022 um 22:26 Uhr schrieb Andrew Otto :
>
>> Have you tried MirrorMaker 2's consumer offset translation feature?  I
>> have not used this myself, but it sounds like what you are looking for!
>> https://issues.apache.org/jira/browse/KAFKA-9076
>>
>> https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html
>> https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/
>>
>> I tried to find some better docs to link for you, but that's the best I
>> got :)  It looks like there is just the Java API.
>>
>>
>>
>> On Wed, May 4, 2022 at 3:29 PM Hemanga Borah 
>> wrote:
>>
>>> Thank you for the suggestions, guys!
>>>
>>> @Austin Cawley-Edwards
>>> Your idea is spot on! This approach would surely work. We could take a
>>> savepoint of each of our apps, load it using state processor apis and
>>> create another savepoint accounting for the delta on the offsets, and start
>>> the app on the new cloud using this modified savepoint.
>>> However, the solution will not be generic, and we have to do this for
>>> each of our applications. This can be quite cumbersome as we have several
>>> applications (around 25).
>>>
>>> We are thinking of overriding the FlinkKafkaConsumerBase to account for
>>> the offset deltas during the start-up of any app. Do you think it is safe
>>> to do that? Is there a better way of doing this?
>>>
>>> @Schwalbe Matthias
>>> Thank you for your suggestion. We do use exactly-once semantics, but,
>>> our apps can tolerate a few duplicates in rare cases like this one where we
>>> are migrating clouds. However, your suggestion is really helpful and we
>>> will use it in case some of the apps cannot tolerate duplicate data.
>>>
>>>
>>> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
>>>> Hello Hemanga,
>>>>
>>>>
>>>>
>>>> MirrorMaker can cause havoc in many respects, for one, it does not have
>>>> strict exactly-once.semantics…
>>>>
>>>>
>>>>
>>>> The way I would tackle this problem (and have done in similar
>>>> situaltions):
>>>>
>>>>
>>>>
>>>>- For the source topics that need to be have exactly-once-semantics
>>>>and that are not intrinsically idempotent:
>>>>- Add one extra operator after the source that deduplicates events
>>>>by unique id for a rolling time range (on the source cloud provider)
>>>>- Take a savepoint after the rolling time-range has passed (at
>>>>least once completely)
>>>>- Move your job to the target cloud provider
>>>>- Reconfi

Re: Migrating Flink apps across cloud with state

2022-05-04 Thread Hemanga Borah
Thank you for the suggestions, guys!

@Austin Cawley-Edwards
Your idea is spot on! This approach would surely work. We could take a
savepoint of each of our apps, load it using state processor apis and
create another savepoint accounting for the delta on the offsets, and start
the app on the new cloud using this modified savepoint.
However, the solution will not be generic, and we have to do this for each
of our applications. This can be quite cumbersome as we have several
applications (around 25).

We are thinking of overriding the FlinkKafkaConsumerBase to account for the
offset deltas during the start-up of any app. Do you think it is safe to do
that? Is there a better way of doing this?

@Schwalbe Matthias
Thank you for your suggestion. We do use exactly-once semantics, but, our
apps can tolerate a few duplicates in rare cases like this one where we are
migrating clouds. However, your suggestion is really helpful and we will
use it in case some of the apps cannot tolerate duplicate data.


On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hello Hemanga,
>
>
>
> MirrorMaker can cause havoc in many respects, for one, it does not have
> strict exactly-once.semantics…
>
>
>
> The way I would tackle this problem (and have done in similar situaltions):
>
>
>
>- For the source topics that need to be have exactly-once-semantics
>and that are not intrinsically idempotent:
>- Add one extra operator after the source that deduplicates events by
>unique id for a rolling time range (on the source cloud provider)
>- Take a savepoint after the rolling time-range has passed (at least
>once completely)
>- Move your job to the target cloud provider
>- Reconfigure the resp. source with a new kafka consumer group.id,
>- Change the uid() of the resp. kafka source,
>- Configure start-by-timestamp for the resp. source with a timestamp
>that lies within the rolling time range (of above)
>- Configure the job to ignore  recovery for state that does not have a
>corresponding operator in the job (the previous kafka source uid()s)
>- Start the job on new cloud provider, wait for it to pick up/back-fill
>- Take a savepoint
>- Remove deduplication operator if that causes too much
>load/latency/whatever
>
>
>
> This scheme sounds more complicated than it really is … and has saved my
> sanity quite a number of times 😊
>
>
>
> Good luck and ready to answer more details
>
>
>
> Thias
>
>
>
> *From:* Hemanga Borah 
> *Sent:* Tuesday, May 3, 2022 3:12 AM
> *To:* user@flink.apache.org
> *Subject:* Migrating Flink apps across cloud with state
>
>
>
> Hello,
>  We are attempting to port our Flink applications from one cloud provider
> to another.
>
>  These Flink applications consume data from Kafka topics and output to
> various destinations (Kafka or databases). The applications have states
> stored in them. Some of these stored states are aggregations, for example,
> at times we store hours (or days) worth of data to aggregate over time.
> Some other applications have cached information for data enrichment, for
> example, we store data in Flink state for days, so that we can join them
> with newly arrived data. The amount of data on the input topics is a lot,
> and it will be expensive to reprocess the data from the beginning of the
> topic.
>
>  As such, we want to retain the state of the application when we move to a
> different cloud provider so that we can retain the aggregations and cache,
> and do not have to start from the beginning of the input topics.
>
>  We are replicating the Kafka topics using MirrorMaker 2. This is our
> procedure:
>
>- Replicate the input topics of each Flink application from source
>cloud to destination cloud.
>- Take a savepoint of the Flink application on the source cloud
>provider.
>- Start the Flink application on the destination cloud provider using
>the savepoint from the source cloud provider.
>
>
> However, this does not work as we want because there is a difference in
> offset in the new topics in the new cloud provider (because of MirrorMaker
> implementation). The offsets of the new topic do not match the ones stored
> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
> topic during startup.
>
> Has anyone tried to move clouds while retaining the Flink state?
>
> Thanks,
> Hemanga
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkei

Re: Migrating Flink apps across cloud with state

2022-05-03 Thread Hemanga Borah
Any ideas, guys?

On Mon, May 2, 2022 at 6:11 PM Hemanga Borah 
wrote:

> Hello,
>  We are attempting to port our Flink applications from one cloud provider
> to another.
>
>  These Flink applications consume data from Kafka topics and output to
> various destinations (Kafka or databases). The applications have states
> stored in them. Some of these stored states are aggregations, for example,
> at times we store hours (or days) worth of data to aggregate over time.
> Some other applications have cached information for data enrichment, for
> example, we store data in Flink state for days, so that we can join them
> with newly arrived data. The amount of data on the input topics is a lot,
> and it will be expensive to reprocess the data from the beginning of the
> topic.
>
>  As such, we want to retain the state of the application when we move to a
> different cloud provider so that we can retain the aggregations and cache,
> and do not have to start from the beginning of the input topics.
>
>  We are replicating the Kafka topics using MirrorMaker 2. This is our
> procedure:
>
>- Replicate the input topics of each Flink application from source
>cloud to destination cloud.
>- Take a savepoint of the Flink application on the source cloud
>provider.
>- Start the Flink application on the destination cloud provider using
>the savepoint from the source cloud provider.
>
>
> However, this does not work as we want because there is a difference in
> offset in the new topics in the new cloud provider (because of MirrorMaker
> implementation). The offsets of the new topic do not match the ones stored
> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
> topic during startup.
>
> Has anyone tried to move clouds while retaining the Flink state?
>
> Thanks,
> Hemanga
>


Migrating Flink apps across cloud with state

2022-05-02 Thread Hemanga Borah
Hello,
 We are attempting to port our Flink applications from one cloud provider
to another.

 These Flink applications consume data from Kafka topics and output to
various destinations (Kafka or databases). The applications have states
stored in them. Some of these stored states are aggregations, for example,
at times we store hours (or days) worth of data to aggregate over time.
Some other applications have cached information for data enrichment, for
example, we store data in Flink state for days, so that we can join them
with newly arrived data. The amount of data on the input topics is a lot,
and it will be expensive to reprocess the data from the beginning of the
topic.

 As such, we want to retain the state of the application when we move to a
different cloud provider so that we can retain the aggregations and cache,
and do not have to start from the beginning of the input topics.

 We are replicating the Kafka topics using MirrorMaker 2. This is our
procedure:

   - Replicate the input topics of each Flink application from source cloud
   to destination cloud.
   - Take a savepoint of the Flink application on the source cloud provider.
   - Start the Flink application on the destination cloud provider using
   the savepoint from the source cloud provider.


However, this does not work as we want because there is a difference in
offset in the new topics in the new cloud provider (because of MirrorMaker
implementation). The offsets of the new topic do not match the ones stored
on the Flink savepoint, hence, Flink cannot map to the offsets of the new
topic during startup.

Has anyone tried to move clouds while retaining the Flink state?

Thanks,
Hemanga