How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Sachin Mittal
Hi,
I have a long running yarn cluster and I submit my streaming job using the
following command:

flink run -m yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
--output file:///output/

Let's say I want to stop this job, make updates to the jar and some new
input arguments and restart the job from the savepoint. How would I do the
same ?

Would this be the right command ?

flink run -s /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab -m
yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount-Updated.jar --input
file:///input1.txt --output file:///output1/ --newarg value123

Thanks
Sachin


Re: Task Manager memory usage

2024-05-23 Thread Sachin Mittal
Hi
Where are you storing the state.
Try rocksdb.

Thanks
Sachin


On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov  wrote:

> Hi,
>
> I am trying to understand the following behavior in our Flink application
> cluster. Any assistance would be appreciated.
>
> We are running a Flink application cluster with 5 task managers, each with
> the following configuration:
>
>- jobManagerMemory: 12g
>- taskManagerMemory: 20g
>- taskManagerMemoryHeapSize: 12g
>- taskManagerMemoryNetworkMax: 4g
>- taskManagerMemoryNetworkMin: 1g
>- taskManagerMemoryManagedSize: 50m
>- taskManagerMemoryOffHeapSize: 2g
>- taskManagerMemoryNetworkFraction: 0.2
>- taskManagerNetworkMemorySegmentSize: 4mb
>- taskManagerMemoryFloatingBuffersPerGate: 64
>- taskmanager.memory.jvm-overhead.min: 256mb
>- taskmanager.memory.jvm-overhead.max: 2g
>- taskmanager.memory.jvm-overhead.fraction: 0.1
>
> Our pipeline includes stateful transformations, and we are verifying that
> we clear the state once it is no longer needed.
>
> Through the Flink UI, we observe that the heap size increases and
> decreases during the job lifecycle.
>
> However, there is a noticeable delay between clearing the state and the
> reduction in heap size usage, which I assume is related to the garbage
> collector frequency.
>
> What is puzzling is the task manager pod memory usage. It appears that the
> memory usage increases intermittently and is not released. We verified the
> different state metrics and confirmed they are changing according to the
> logic.
>
> Additionally, if we had a state that was never released, I would expect to
> see the heap size increasing constantly as well.
>
> Any insights or ideas?
>
> Thanks,
>
> Sigalit
>


Re: What is the best way to aggregate data over a long window

2024-05-17 Thread Sachin Mittal
Hi,
I am doing the following
1. Use reduce function where the data type of output after windowing is the
same as the input.
2. Where the output of data type after windowing is different from that of
input I use the aggregate function. For example:

SingleOutputStreamOperator data =
reducedPlayerStatsData
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.aggregate(new DataAggregator())
.name("aggregate");

In this case data which is aggregated is of a different type than the input
so I had to use aggregate function.
However in cases where data is of the same type using reduce function is
very simple to use.
Is there any fundamental difference between aggregate and reduce function
in terms of performance?
3. I have enable incremental checkpoints at flink conf level using:
state.backend.type: "rocksdb"
state.backend.incremental: "true"

4. I am really not sure how I can use TTL. I assumed that Flink would
automatically clean the state of windows that are expired ? Is there any
way I can use TTL in the steps I have mentioned.
5. When you talk about pre-aggregation is this what you mean, say first
compute minute aggregation and use that as input for hour aggregation ? So
my pipeline would be something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new DataReducer()).window(

TumblingEventTimeWindows.of(Time.seconds(3600)))
.reduce(new DataReducer()).name("reduce");


I was thinking of performing incremental aggregation using stateful processing.

Basically read one record and reduce it and store it in state and then
read next and reduce that plus the current state and update the new
reduced value back in the state and so on.

Fire the final reduced value from the state at the end of eventtime I
register to my event timer and then update the timer to next event
time and also clean the state.

This way each state would always keep only one record, no matter for
what period we aggregate data for.

Is this a better approach than windowing ?


Thanks
Sachin


On Fri, May 17, 2024 at 1:14 PM gongzhongqiang 
wrote:

> Hi  Sachin,
>
> We can optimize this problem in the following ways:
> -
> use 
> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
> to reduce number of data
> - use TTL to clean data which are not need
> - enble incremental checkpoint
> - use
> multi-level time window granularity for pre-aggregation can significantly 
> improve performance and reduce computation latency
>
> Best,
> Zhongqiang Gong
>
> Sachin Mittal  于2024年5月17日周五 03:48写道:
>
>> Hi,
>> My pipeline step is something like this:
>>
>> SingleOutputStreamOperator reducedData =
>> data
>> .keyBy(new KeySelector())
>> .window(
>> TumblingEventTimeWindows.of(Time.seconds(secs)))
>> .reduce(new DataReducer())
>> .name("reduce");
>>
>>
>> This works fine for secs = 300.
>> However once I increase the time window to say 1 hour or 3600 the state
>> size increases as now it has a lot more records to reduce.
>>
>> Hence I need to allocate much more memory to the task manager.
>>
>> However there is no upper limit to this memory allocated. If the volume
>> of data increases by say 10 fold I would have no option but to again
>> increase the memory.
>>
>> Is there a better way to perform long window aggregation so overall this
>> step has a small memory footprint.
>>
>> Thanks
>> Sachin
>>
>>


What is the best way to aggregate data over a long window

2024-05-16 Thread Sachin Mittal
Hi,
My pipeline step is something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.reduce(new DataReducer())
.name("reduce");


This works fine for secs = 300.
However once I increase the time window to say 1 hour or 3600 the state
size increases as now it has a lot more records to reduce.

Hence I need to allocate much more memory to the task manager.

However there is no upper limit to this memory allocated. If the volume of
data increases by say 10 fold I would have no option but to again increase
the memory.

Is there a better way to perform long window aggregation so overall this
step has a small memory footprint.

Thanks
Sachin


Re: how to reduce read times when many jobs read the same kafka topic?

2024-05-14 Thread Sachin Mittal
Each separate job would have its own consumer group hence they will read
independently from the same topic and when checkpointing they will commit
their own offsets.
So if any job fails, it will not affect the progress of other jobs when
reading from Kafka.

I am not sure of the impact of network load when multiple consumer groups
are requesting data from the same topic.

Multiple small jobs ensure that each job is scaled and monitored in an
isolated way.

Having an efficient serde can help a lot of the data we store in state,
data forwarded to next steps and overall state management.

Another thing you can look into is if your job step is keyed by some key,
then make sure they are keyed as a string or any other Java primitive types
since Object keys are much slower when reading from and writing to a state
store.

Thanks
Sachin


On Wed, May 15, 2024 at 7:58 AM longfeng Xu 
wrote:

> Thank you . we will try .
>
> I‘m still confused about multiple jobs on a cluster (flink-session-yarn)
> reading the same topic from kafka cluster, I understand that in this mode,
> the number of times reading the topic has not decreased; it just shares the
> TCP channel of the task manager, reducing the network load. Is my
> understanding correct?
>
> Or are there any other advantages to it? Please advise. Thank you.
>
> Sachin Mittal  于2024年5月15日周三 09:24写道:
>
>> We have the same scenario.
>> We thought of having one big job with multiple branches but this leads to
>> single point of failure as any issue with any branch would lead to the job
>> failure and also all the sub branches would stop processing.
>>
>> Hence running multiple jobs on a cluster say yarn is better.
>>
>> Now to overcome serde issue try to use some of the more efficient schemes
>> as recommended by flink. We are using POJO and it has yielded good results
>> for us.
>>
>>
>> On Wed, 15 May 2024 at 5:59 AM, longfeng Xu 
>> wrote:
>>
>>> hi
>>>   there are many flink jobs read one kafka topic in this scenario,
>>> therefore CPU resources waste in  serialization/deserialization and
>>> network  load is too heavy . Can you recommend a solution to avoid this
>>> situation? e.g it can be more effectively using one large stream job with
>>> multi branchs ?
>>>
>>>  Best regards,
>>>
>>>


How can we exclude operator level metrics from getting reported

2024-05-11 Thread Sachin Mittal
Hi
I have a following metrics configuration:

metrics.reporters: stsd
metrics.reporter.stsd.factory.class:
org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: '8125'
metrics.reporter.stsd.filter.excludes: *.operator:*:*

metrics.scope.jm: jm
metrics.scope.jm-job: jm.
metrics.scope.tm: tm
metrics.scope.tm-job: tm.
metrics.scope.task: tm...
metrics.scope.operator: tm...

I would like to exclude operator level metrics from getting reported via
statsd reporter.
Can anyone please confirm that my setting for
metrics.reporter.stsd.filter.excludes
is correct in achieving the same.

Thanks
Sachin


Re: Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
Hi Xuyang,

So if I check the side output way then my pipeline would be something like
this:

final OutputTag lateOutputTag = new OutputTag("late-data"){};

SingleOutputStreamOperator  reducedDataStream =

dataStream
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(180))

 .sideOutputLateData(lateOutputTag)
.reduce(new MyDataReducer());


DataStream lateStream = reducedDataStream.getSideOutput
(lateOutputTag);

lateStream
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(480))

.reduce(new MyDataReducer());


So basically I collect all the late data into another stream and apply the
same transformations again on it, to get reduced data from this late data.
Is this the correct handling for having reduced data from late data only ?

Also I have a few more queries
1. Now for this late records stream not having to drop records, I would
have to set allowed lateness to be of a larger value than what I had set in
the first stream transformation ?
   Basically do I need to set any allowed lateness for the window operation
of the late data stream if I want to also reduce them the same way as in
time records ?

2. Also when I collect late data as side output, would the reduced function
now only contain the data reduced from in time records only and no late
records would be included in the subsequent reduced data.

Basically after this the output of reduced data will only contain:

[ reduceData (d1, d2, d3) ]

and not any data like:
reducedData(d1, d2, d3, late d4, late d5)  or reducedData(d1, d2, d3, late
d4, late d5, late d6)

And transformation of lata data stream would now contain reduced data from:

[ reducedData(late d4, late d5, late d6) ]

Thanks
Sachin


On Wed, Apr 17, 2024 at 4:05 PM Xuyang  wrote:

> Hi, Sachin.
>
> IIUC, it is in the second situation you listed, that is:
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ].
> However, because of `table.exec.emit.late-fire.delay`, it could also be
> such as
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5),
> reducedData(ld1, d2, d3, late d4, late d5, late d6) ]
>
> Actually, allow-lateness(table.exec.emit.allow-lateness) is used to
> control when it decides not to change the value of the window output, and
> allowing the framework to automatically clear the corresponding state.
>
> > Also if I want the reduced data from late records to not include the
> data emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Maybe side output[1] can help you to collect the late data and re-compute
> them.
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/
>
> --
> Best!
> Xuyang
>
>
> At 2024-04-17 16:56:54, "Sachin Mittal"  wrote:
>
> Hi,
>
> Suppose my pipeline is:
>
> data
> .keyBy(new MyKeySelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .allowedLateness(Time.seconds(180))
> .reduce(new MyDataReducer())
>
>
> So I wanted to know if the final output stream would contain reduced data
> at the end of the window mark and also another reduced data at the end of
> allowed lateness ?
> If that is the case, then the reduced data at the end of allowed lateness
> would also include the data from non late records or it will only include
> reduced data from late records.
>
> Example
>
> If I have data in sequence:
>
> [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end
> of allowed lateness]
>
> The resultant stream after window and reduce operation would be:
>
> [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]
>
> or
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ]
>
> or something else ?
>
> Also if I want the reduced data from late records to not include the data
> emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Thanks
> Sachin
>
>
>
>
>


Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
Hi,

Suppose my pipeline is:

data
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(180))
.reduce(new MyDataReducer())


So I wanted to know if the final output stream would contain reduced data
at the end of the window mark and also another reduced data at the end of
allowed lateness ?
If that is the case, then the reduced data at the end of allowed lateness
would also include the data from non late records or it will only include
reduced data from late records.

Example

If I have data in sequence:

[window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end
of allowed lateness]

The resultant stream after window and reduce operation would be:

[ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]

or
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
d6) ]

or something else ?

Also if I want the reduced data from late records to not include the data
emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?

Thanks
Sachin


Re: Understanding event time wrt watermarking strategy in flink

2024-04-15 Thread Sachin Mittal
Hi Yunfeng,
So regarding the dropping of records for out of order watermark, lats say
records later than T - B will be dropped by the first operator after
watermarking, which is reading from the source.
So then these records will never be forwarded to the step where we do
event-time windowing. Hence those records will never arrive at that step.

Hence records with timestamp T - A - B will never reach my windowing
operator, to get collected by the side outputs.

Is this understanding correct?

If this is the case then shouldn't A be less than B to atleast collect
those records to get included in a particular window.

Basically having allowed lateness A greater than the out of order bound B
won't make sense as records later than T - B would have got dropped at the
source itself.

Please let me know if I am understanding this correctly or am I missing
something?

Thanks
Sachin


On Mon, Apr 15, 2024 at 6:56 AM Yunfeng Zhou 
wrote:

> Hi Sachin,
>
> Firstly sorry for my misunderstanding about watermarking in the last
> email. When you configure an out-of-orderness watermark with a
> tolerance of B, the next watermark emitted after a record with
> timestamp T would be T-B instead of T described in my last email.
>
> Then let's go back to your question. When the Flink job receives n
> records with timestamp Tn, it will set the timestamp of the next
> watermark to be max(Tn - B - 1ms), and that watermark will be emitted
> after the next autoWatermarkInternal is reached. So a record with
> timestamp T will not influence records less than T - B immediately,
> instead it influences the next watermark, and the watermark afterwards
> influences those records.
>
> As for the influence on those late records, Flink operators will drop
> them by default, but you can also gather them for other downstream
> logics. Please refer to
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>
> Based on the analysis above, if you configure allowed lateness to A,
> records with timestamps less than T - A - B will be dropped or
> gathered as side outputs.
>
> Best,
> Yunfeng
>
> On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal  wrote:
> >
> > Hi Yunfeng,
> > I have a question around the tolerance for out of order bound
> watermarking,
> >
> > What I understand that when consuming from source with out of order
> bound set as B, lets say it gets a record with timestamp T.
> > After that it will drop all the subsequent records which arrive with the
> timestamp less than T - B.
> >
> > Please let me know if I understood this correctly.
> >
> > If this is correct, then how does allowed lateness when performing event
> time windowing works ?  Say allowed lateness is set as A,
> > does this mean that value of A should be less than that of B because
> records with timestamp less than T - B would have already been dropped at
> the source.
> >
> > If this is not the case than how does lateness work with our of order
> boundedness ?
> >
> > Thanks
> > Sachin
> >
> >
> > On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com> wrote:
> >>
> >> Hi Sachin,
> >>
> >> 1. When your Flink job performs an operation like map or flatmap, the
> >> output records would be automatically assigned with the same timestamp
> >> as the input record. You don't need to manually assign the timestamp
> >> in each step. So the windowing result in your example should be as you
> >> have expected.
> >>
> >> 2. The frequency of watermarks can be configured by
> >> pipeline.auto-watermark-interval in flink-conf.yaml, or
> >> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> >> the event time related to the Watermark is still T, just that the job
> >> will tolerate any records whose timestamp is in range [T-B, T].
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal 
> wrote:
> >> >
> >> > Hello folks,
> >> > I have few questions:
> >> >
> >> > Say I have a source like this:
> >> >
> >> > final DataStream data =
> >> > env.fromSource(
> >> > source,
> >> >
>  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> >> > .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >> >
> >> >
> >> > My pipeline after this is as followed:
> >> >
> >> > data.flatMap(new MyFlatte

Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Sachin Mittal
Hi Yunfeng,
I have a question around the tolerance for out of order bound watermarking,

What I understand that when consuming from source with out of order bound
set as B, lets say it gets a record with timestamp T.
After that it will drop all the subsequent records which arrive with the
timestamp less than T - B.

Please let me know if I understood this correctly.

If this is correct, then how does allowed lateness when performing event
time windowing works ?  Say allowed lateness is set as A,
does this mean that value of A should be less than that of B because
records with timestamp less than T - B would have already been dropped at
the source.

If this is not the case than how does lateness work with our of order
boundedness ?

Thanks
Sachin


On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou 
wrote:

> Hi Sachin,
>
> 1. When your Flink job performs an operation like map or flatmap, the
> output records would be automatically assigned with the same timestamp
> as the input record. You don't need to manually assign the timestamp
> in each step. So the windowing result in your example should be as you
> have expected.
>
> 2. The frequency of watermarks can be configured by
> pipeline.auto-watermark-interval in flink-conf.yaml, or
> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> the event time related to the Watermark is still T, just that the job
> will tolerate any records whose timestamp is in range [T-B, T].
>
> Best,
> Yunfeng
>
> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
> >
> > Hello folks,
> > I have few questions:
> >
> > Say I have a source like this:
> >
> > final DataStream data =
> > env.fromSource(
> > source,
> >
>  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> > .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >
> >
> > My pipeline after this is as followed:
> >
> > data.flatMap(new MyFlattendData())
> > .keyBy(new MyKeySelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> > .reduce(new MyReducer());
> >
> >
> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> > Example say I have timestamped data from source as:
> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >
> >  would this get flattened to say:
> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >
> > then keyed to say:
> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
> flatdata4]),...]
> >
> > windows:
> > 1st => [ flatdata1, flatdata2 ]
> > 2nd => [ flatdata4, ... ]
> >
> > Would the windows created before the reduce function be applied be like
> I have illustrated or to have it this way, do I need to output a record at
> each step with the timestamp assigned for that record ?
> >
> > Basically is the timestamp assigned when reading from the source pushed
> (retained) down to all the steps below when doing event time window
> operation ?
> >
> >
> > Next question is in my watermark strategy: how do I set the period of
> the watermarking.
> > Basically from An out-of-order bound B means that once an event with
> timestamp T was encountered, no events older than T - B will follow any
> more when the watermarking is done.
> >
> > However, how frequently is watermarking done and when say watermarking,
> the last encountered event was with timestamp T , does this mean watermark
> timestamp would be T - B ?
> >
> > How can we control the watermarking period ?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> >
> >
>


Understanding event time wrt watermarking strategy in flink

2024-04-11 Thread Sachin Mittal
Hello folks,
I have few questions:

Say I have a source like this:

final DataStream data =
env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((event, timestamp) -> event.timestamp));


My pipeline after this is as followed:

data.flatMap(new MyFlattendData())
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new MyReducer());


First question I have is that the timestamp I assign from the source, would
it get carried to all steps below to my window ?
Example say I have timestamped data from source as:
=> [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]

 would this get flattened to say:
=> [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]

then keyed to say:
=> [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
flatdata4]),...]

windows:
1st => [ flatdata1, flatdata2 ]
2nd => [ flatdata4, ... ]

Would the windows created before the reduce function be applied be like I
have illustrated or to have it this way, do I need to output a record at
each step with the timestamp assigned for that record ?

Basically is the timestamp assigned when reading from the source pushed
(retained) down to all the steps below when doing event time window
operation ?


Next question is in my watermark strategy: how do I set the period of the
watermarking.
Basically from An out-of-order bound B means that once an event with
timestamp T was encountered, no events older than T - B will follow any
more when the watermarking is done.

However, how frequently is watermarking done and when say watermarking, the
last encountered event was with timestamp T , does this mean watermark
timestamp would be T - B ?

How can we control the watermarking period ?

Thanks
Sachin


How are window's boundaries decided in flink

2024-04-10 Thread Sachin Mittal
Hi,
Lets say I have defined 1 minute TumblingEventTimeWindows.
So it will create windows as:
(0, 60), (60, 120), 

Now lets say I have an event at time t = 60.
In which window would this get aggregated ?
1st or second or both.

Say I want this to get aggregated only in the second window, how can I
achieve this ?


Thanks
Sachin


Re: How to debug window step in flink

2024-04-08 Thread Sachin Mittal
Hi,
Yes it was a watermarking issue. There were few out of order records in my
stream and as per watermarking strategy the watermark was advanced to the
future and hence current events were getting discarded.
I have fixed this by not processing future timestamped records.

Thanks
Sachin


On Mon, Apr 8, 2024 at 11:55 AM  wrote:

> Hi Sachin
>
>
>
> What exactly does the MyReducer do? Can you provide us with some code?
>
>
>
> Just a wild guess from my side, did you check the watermarking? If the
> Watermarks aren't progressing there's no way for Flink to know when to emit
> a window and therefore you won't see any outgoing events.
>
> Kind Regards
>
> Dominik
>
>
>
> *From: *Sachin Mittal 
> *Date: *Monday, 8 April 2024 at 08:17
> *To: *user@flink.apache.org 
> *Subject: *How to debug window step in flink
>
> *Be aware:* This is an external email.
>
>
>
> Hi,
>
>
>
> I have a following windowing step in my pipeline:
>
>
>
> inputData
> .keyBy(new MyKeySelector())
> .window(
> TumblingEventTimeWindows.*of*(Time.*seconds*(60)))
> .reduce(new MyReducer())
> .name("MyReducer");
>
>
>
> Same step when I see in Flink UI shows as:
>
>
>
> Window(TumblingEventTimeWindows(6), EventTimeTrigger, MyReducer,
> PassThroughWindowFunction) :- Sink: MyData sink +- Filter
>
>
>
>
>
> So far I don't see any errors in the pipeline.
>
>
>
> However when I check the following metrics:
>
> 0.MyReducer.numRecordsInPerSecond = 600 / s
> 0.MyReducer.numRecordsOutPerSecond = 0 / s
>
>
>
> It shows that the step is receiving data but it is not outputting anything.
>
> Looks like some problem in the step.
>
>
>
> Most obvious thing that I can deduce is that the window step may not be
> outputting any record.
>
> Also MyReducer is a pretty simple function and it does not catch any
> exception, so if there was a problem in this function it would have been
> thrown to the pipeline
>
> and I would have known.
>
>
>
> Please let me know how I can debug this better.
>
>
>
> Thanks
>
> Sachin
>
>
>
>
>
>
>


How to debug window step in flink

2024-04-08 Thread Sachin Mittal
Hi,

I have a following windowing step in my pipeline:

inputData
.keyBy(new MyKeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new MyReducer())
.name("MyReducer");


Same step when I see in Flink UI shows as:

Window(TumblingEventTimeWindows(6), EventTimeTrigger, MyReducer,
PassThroughWindowFunction) :- Sink: MyData sink +- Filter


So far I don't see any errors in the pipeline.

However when I check the following metrics:
0.MyReducer.numRecordsInPerSecond = 600 / s
0.MyReducer.numRecordsOutPerSecond = 0 / s

It shows that the step is receiving data but it is not outputting anything.
Looks like some problem in the step.

Most obvious thing that I can deduce is that the window step may not be
outputting any record.
Also MyReducer is a pretty simple function and it does not catch any
exception, so if there was a problem in this function it would have been
thrown to the pipeline
and I would have known.

Please let me know how I can debug this better.

Thanks
Sachin


How to handle tuple keys with null values

2024-04-02 Thread Sachin Mittal
Hello folks,
I am keying my stream using a Tuple:

example:

public class MyKeySelector implements KeySelector> {

@Override
public Tuple2 getKey(Data data) {
  return Tuple2.of(data.id, data.id1);
}

}

Now id1 can have null values. In this case how should I handle this?

Right now I am getting this error:

java.lang.RuntimeException: Exception occurred while setting the
current key context.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.flink.types.NullFieldException: Field 1 is null,
but expected to hold a value.
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
~[flink-dist-1.17.1.jar:1.17.1]
... 18 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]


Thanks
Sachin


Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-22 Thread Sachin Mittal
So, when we create an EMR cluster the NN service runs on the primary node
of the cluster.
Now at the time of creating the cluster, how can we specify the name of
this NN in format hdfs://*namenode-host*:8020/.

Is there a standard name by which we can identify the NN server ?

Thanks
Sachin


On Fri, Mar 22, 2024 at 12:08 PM Asimansu Bera 
wrote:

> Hello Sachin,
>
> Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
> down or VMs are required to be shut down for security updates or due to
> faults, new VMs will be added to the cluster. As a result, any data stored
> in the local file system, such as file://tmp, would be lost. To ensure data
> persistence and prevent loss of checkpoint or savepoint data for recovery,
> it is advisable to store such data in a persistent storage solution like
> HDFS or S3.
>
> Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
> details from EMR service.
>
> Hope this helps.
>
> -A
>
>
> On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal  wrote:
>
>> Hi,
>> We are using AWS EMR where we can submit our flink jobs to a long running
>> flink cluster on Yarn.
>>
>> We wanted to configure RocksDBStateBackend as our state backend to store
>> our checkpoints.
>>
>> So we have configured following properties in our flink-conf.yaml
>>
>>- state.backend.type: rocksdb
>>- state.checkpoints.dir: file:///tmp
>>- state.backend.incremental: true
>>
>>
>> My question here is regarding the checkpoint location: what is the
>> difference between the location if it is a local filesystem vs a hadoop
>> distributed file system (hdfs).
>>
>> What advantages we get if we use:
>>
>> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
>> vs
>> *state.checkpoints.dir*: file:///tmp
>>
>> Also if we decide to use HDFS then from where we can get the value for
>> *namenode-host:port*
>> given we are running Flink on an EMR.
>>
>> Thanks
>> Sachin
>>
>>
>>


Re: Flink unable to read from kafka source due to starting offset strategy

2024-03-22 Thread Sachin Mittal
Hi,
After some debugging I see these in the logs:

2024-03-22 14:25:47,555 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 11 due to request timeout.
2024-03-22 14:25:47,647 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 8 due to node 11 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 3ms)
2024-03-22 14:25:47,650 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 12 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 9 due to node 12 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30036ms, request timeout: 3ms)
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 14 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 7 due to node 14 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 3ms)
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 15 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 10 due to node 15 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 3ms)
2024-03-22 14:25:47,652 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 11:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 12:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 14:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,658 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 15:
org.apache.kafka.common.errors.DisconnectException: null



I think I have started seeing this only recently when I tinkered with the
taskmanager.memory.process.size.
I think I reduced this from 4g to 2g.

Any idea why we the consumer network client is getting disconnected. Is
this because this thread is not getting enough resources or something ?

Thanks
Sachin


On Fri, Mar 22, 2024 at 12:48 PM Sachin Mittal  wrote:

> Hi,
> I was experimenting with different starting offset strategies for my Flink
> job, especially in cases where jobs are canceled and scheduled again
> and I would like to start with the last committed offset and if the same
> is not available then start from the latest.
>
> So I decided to use this:
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
>
>
> Now when I start my job in the job master I get is:
>
> Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3,
> StoppingOffset: -9223372036854775808],
> [Partition: mytopic-2, StartingOffset: -3, StoppingOffset: -
> 9223372036854775808]]}
>
> Looks like here both starting and stopping offsets are negative, I am not
> sure if this is correct or not.
> However what is happening is that no records are getting read from the
> Kafka source.
>
> Can anyone please tell me what is the right starting offset strategy to
> follow, where a new job is started from last committed offsets or latest.
>
> Also please note that if I just keep the starting offset strategy as:
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets())
>
> Now say I have cancelled the job and started again at a much later date, then 
> the committed offset will not longer be available in Kafka topic,
>
> as the data would have been discarded based on topic retention policy.
>
>
> Hence just using the committed offsets strategy does not always work.
>
>
> Thanks
> Sachin
>
>
>


Re: Flink unable to read from kafka source due to starting offset strategy

2024-03-22 Thread Sachin Mittal
Hi,
I was experimenting with different starting offset strategies for my Flink
job, especially in cases where jobs are canceled and scheduled again
and I would like to start with the last committed offset and if the same is
not available then start from the latest.

So I decided to use this:

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))


Now when I start my job in the job master I get is:

Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3,
StoppingOffset: -9223372036854775808],
[Partition: mytopic-2, StartingOffset: -3, StoppingOffset: -
9223372036854775808]]}

Looks like here both starting and stopping offsets are negative, I am not
sure if this is correct or not.
However what is happening is that no records are getting read from the
Kafka source.

Can anyone please tell me what is the right starting offset strategy to
follow, where a new job is started from last committed offsets or latest.

Also please note that if I just keep the starting offset strategy as:

.setStartingOffsets(OffsetsInitializer.committedOffsets())

Now say I have cancelled the job and started again at a much later
date, then the committed offset will not longer be available in Kafka
topic,

as the data would have been discarded based on topic retention policy.


Hence just using the committed offsets strategy does not always work.


Thanks
Sachin


Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-21 Thread Sachin Mittal
Hi,
We are using AWS EMR where we can submit our flink jobs to a long running
flink cluster on Yarn.

We wanted to configure RocksDBStateBackend as our state backend to store
our checkpoints.

So we have configured following properties in our flink-conf.yaml

   - state.backend.type: rocksdb
   - state.checkpoints.dir: file:///tmp
   - state.backend.incremental: true


My question here is regarding the checkpoint location: what is the
difference between the location if it is a local filesystem vs a hadoop
distributed file system (hdfs).

What advantages we get if we use:

*state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
vs
*state.checkpoints.dir*: file:///tmp

Also if we decide to use HDFS then from where we can get the value for
*namenode-host:port*
given we are running Flink on an EMR.

Thanks
Sachin


Re: Need help in understanding PojoSerializer

2024-03-20 Thread Sachin Mittal
Hi,
I saw the post but I did not understand how I would configure these fields
to use those serializers. (I can change the set type to a list type for
now).
As per the docs I see that we can annotate fields with @TypeInfo

But what I did not get is how using this annotation I can use ListSerializer
and *MapSerializer.*

Thanks
Sachin


On Wed, Mar 20, 2024 at 10:47 PM Ken Krugler 
wrote:

> Flink doesn’t have built-in support for serializing Sets.
>
> See this (stale) issue about the same:
> https://issues.apache.org/jira/browse/FLINK-16729
>
> You could create a custom serializer for sets, see
> https://stackoverflow.com/questions/59800851/flink-serialization-of-java-util-list-and-java-util-map
> and
> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/ListSerializer.html
> for details on how this was done for a list, but it’s not trivial.
>
> Or as a hack, use a Map and the existing support for map
> serialization via
> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/MapSerializer.html
>
> — Ken
>
>
> On Mar 20, 2024, at 10:04 AM, Sachin Mittal  wrote:
>
> Hi,
> I have a Pojo class like this
>
> public class A {
>
> public String str;
>
> public Set aSet;
>
> public Map dMap;
>
> }
>
> However when I start the flink program I get this message:
>
> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
> dMap will be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance and schema evolution.
>
> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
> aSet will be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance and schema evolution.
>
> Also in my code I have added
>
> env.getConfig().disableGenericTypes();
>
> So I don't understand when I use Maps and Sets of primitive types why is 
> Flink not
>
> able to use PojoSerializer for these fields and even when I have disabled 
> generics types.
>
> why I am getting message that it will be processed as GenericType?
>
>
> Any help in understanding what I need to do to ensure all the fields of my 
> object are handled using PojoSerializer.
>
>
> Thanks
>
> Sachin
>
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink & Pinot
>
>
>
>


Need help in understanding PojoSerializer

2024-03-20 Thread Sachin Mittal
Hi,
I have a Pojo class like this

public class A {

public String str;

public Set aSet;

public Map dMap;

}

However when I start the flink program I get this message:

org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
dMap will be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance
and schema evolution.

org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
aSet will be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance
and schema evolution.

Also in my code I have added

env.getConfig().disableGenericTypes();

So I don't understand when I use Maps and Sets of primitive types why
is Flink not

able to use PojoSerializer for these fields and even when I have
disabled generics types.

why I am getting message that it will be processed as GenericType?


Any help in understanding what I need to do to ensure all the fields
of my object are handled using PojoSerializer.


Thanks

Sachin


Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
Hi Hang,
I have checked this in my fat jar and the same class is not packaged in my
jar.

I have also searched about this issue in our mail archives too and the same
issue was posted a few months back too.

https://www.mail-archive.com/user@flink.apache.org/msg52035.html

The solution was to simply downgrade it to flink version 1.17

Does this ring a bell ? Is there an issue with Flink 1.18 where we try to
submit jobs via yarn on aws EMR ?

Thanks
Sachin


On Wed, Mar 13, 2024 at 8:02 AM Hang Ruan  wrote:

> Hi, Sachin.
>
> I use the command `jar -tf flink-dist-1.18.0.jar| grep OutputTag` to make
> sure that this class is packaged correctly.
> I think you should check your own jar to make sure this class is not
> packaged in your jar.
>
> Best,
> Hang
>
> Sachin Mittal  于2024年3月12日周二 20:29写道:
>
>> I miss wrote.  It’s version 1.18.
>>
>> This is latest and works locally but not on aws emr and I get class not
>> found exception.
>>
>>
>>
>> On Tue, 12 Mar 2024 at 1:25 PM, Zhanghao Chen 
>> wrote:
>>
>>> Hi Sachin,
>>>
>>> Flink 1.8 series have already been out of support, have you tried with a
>>> newer version of Flink?
>>> --
>>> *From:* Sachin Mittal 
>>> *Sent:* Tuesday, March 12, 2024 14:48
>>> *To:* user@flink.apache.org 
>>> *Subject:* Facing ClassNotFoundException:
>>> org.apache.flink.api.common.ExecutionConfig on EMR
>>>
>>> Hi,
>>> We have installed a flink cluster version 1.8.0 on AWS EMR.
>>> However when we submit a job we get the following error:
>>>
>>> (Do note that when we submit the same job on a local instance of Flink
>>> 1.8.1 it is working fine.
>>> The fat jar we submit has all the flink dependencies from 1.8.0
>>> including the class org.apache.flink.api.common.ExecutionConfig).
>>>
>>> Caused by: java.lang.RuntimeException: 
>>> org.apache.flink.runtime.client.JobInitializationException: Could not start 
>>> the JobMaster.
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>> at 
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>>> Caused by: org.apache.flink.runtime.client.JobInitializationException: 
>>> Could not start the JobMaster.
>>> at 
>>> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>> Caused by: java.util.concurrent.CompletionException: 
>>> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>>> org.apache.flink.api.common.ExecutionConfig
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>>> ... 3 more
>>&

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
Hi Hang,
Once I exclude file-core from the fat jar I get this error:
I believe org.apache.flink.util.OutputTag is part of flink-core itself.

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/util/OutputTag
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)
at org.apache.hadoop.util.RunJar.run(RunJar.java:321)
at org.apache.hadoop.util.RunJar.main(RunJar.java:241)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.OutputTag
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
... 4 more


Thanks

Sachin



On Tue, Mar 12, 2024 at 2:11 PM Sachin Mittal  wrote:

> Ok. Actually it’s version 1.18. I will try to remove flink-core from the
> fat jar.
>
> On Tue, 12 Mar 2024 at 1:51 PM, Hang Ruan  wrote:
>
>> Hi, Sachin.
>>
>> This error occurs when there is class conflict. There is no need to
>> package flink-core in your own jar. It is already contained in flink-dist.
>> And Flink version 1.8 is too old. It is better to update your flink
>> version.
>>
>> Best,
>> Hang
>>
>>
>>
>> Sachin Mittal  于2024年3月12日周二 16:04写道:
>>
>>> Hi,
>>> We have installed a flink cluster version 1.8.0 on AWS EMR.
>>> However when we submit a job we get the following error:
>>>
>>> (Do note that when we submit the same job on a local instance of Flink
>>> 1.8.1 it is working fine.
>>> The fat jar we submit has all the flink dependencies from 1.8.0
>>> including the class org.apache.flink.api.common.ExecutionConfig).
>>>
>>> Caused by: java.lang.RuntimeException: 
>>> org.apache.flink.runtime.client.JobInitializationException: Could not start 
>>> the JobMaster.
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>> at 
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>> at 
>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>>> Caused by: org.apache.flink.runtime.client.JobInitializationException: 
>>> Could not start the JobMaster.
>>> at 
>>> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>> Caused by: java.util.concurrent.CompletionException: 
>>> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>>> org.apache.flink.api.common.ExecutionConfig
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>>> at 
>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>>> ... 3 more
>>> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>>> org.apache.flink.api

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
Ok. Actually it’s version 1.18. I will try to remove flink-core from the
fat jar.

On Tue, 12 Mar 2024 at 1:51 PM, Hang Ruan  wrote:

> Hi, Sachin.
>
> This error occurs when there is class conflict. There is no need to
> package flink-core in your own jar. It is already contained in flink-dist.
> And Flink version 1.8 is too old. It is better to update your flink
> version.
>
> Best,
> Hang
>
>
>
> Sachin Mittal  于2024年3月12日周二 16:04写道:
>
>> Hi,
>> We have installed a flink cluster version 1.8.0 on AWS EMR.
>> However when we submit a job we get the following error:
>>
>> (Do note that when we submit the same job on a local instance of Flink
>> 1.8.1 it is working fine.
>> The fat jar we submit has all the flink dependencies from 1.8.0 including
>> the class org.apache.flink.api.common.ExecutionConfig).
>>
>> Caused by: java.lang.RuntimeException: 
>> org.apache.flink.runtime.client.JobInitializationException: Could not start 
>> the JobMaster.
>>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>  at 
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>>  at 
>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>  at 
>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
>> not start the JobMaster.
>>  at 
>> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>  at java.base/java.lang.Thread.run(Thread.java:840)
>> Caused by: java.util.concurrent.CompletionException: 
>> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>>  ... 3 more
>> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>  at 
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>>  ... 3 more
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at 
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>>  at 
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
>>  at java.base/java.lang.Class.forName0(Native Method)
>>  at java.base/java.lang.Class.forName(Class.java:467)
>>
>>


Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
I miss wrote.  It’s version 1.18.

This is latest and works locally but not on aws emr and I get class not
found exception.



On Tue, 12 Mar 2024 at 1:25 PM, Zhanghao Chen 
wrote:

> Hi Sachin,
>
> Flink 1.8 series have already been out of support, have you tried with a
> newer version of Flink?
> --
> *From:* Sachin Mittal 
> *Sent:* Tuesday, March 12, 2024 14:48
> *To:* user@flink.apache.org 
> *Subject:* Facing ClassNotFoundException:
> org.apache.flink.api.common.ExecutionConfig on EMR
>
> Hi,
> We have installed a flink cluster version 1.8.0 on AWS EMR.
> However when we submit a job we get the following error:
>
> (Do note that when we submit the same job on a local instance of Flink
> 1.8.1 it is working fine.
> The fat jar we submit has all the flink dependencies from 1.8.0 including
> the class org.apache.flink.api.common.ExecutionConfig).
>
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>   at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>   at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>   ... 3 more
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>   ... 3 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
>   at java.base/java.lang.Class.forName0(Native Method)
>   at java.base/java.lang.Class.forName(Class.java:467)
>
>


Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
Hi,
We have installed a flink cluster version 1.8.0 on AWS EMR.
However when we submit a job we get the following error:

(Do note that when we submit the same job on a local instance of Flink
1.8.1 it is working fine.
The fat jar we submit has all the flink dependencies from 1.8.0 including
the class org.apache.flink.api.common.ExecutionConfig).

Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not
start the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.client.JobInitializationException:
Could not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CompletionException:
java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
... 3 more
Caused by: java.lang.RuntimeException:
java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 3 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)