[jira] [Created] (FLINK-33761) Snowflake as JDBC source

2023-12-05 Thread Boris Litvak (Jira)
Boris Litvak created FLINK-33761:


 Summary: Snowflake as JDBC source
 Key: FLINK-33761
 URL: https://issues.apache.org/jira/browse/FLINK-33761
 Project: Flink
  Issue Type: Bug
Reporter: Boris Litvak






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Distributed cache support in async io

2023-12-05 Thread Yaming Ma
Hi all,

I asked this question before in user mail list but got no response. Just
try in this list if I could get some answer here :-).

I noticed distributed cache is not supported in async io, is there any
reason for that? It's understandable that state apis are not supported
since the data might be changed in working threads. But I don't know why
distributed cache is not supported either.

org.apache.flink.streaming.api.functions.async.RichAsyncFunction
@Override
public DistributedCache getDistributedCache() {
throw new UnsupportedOperationException(
"Distributed cache is not supported in rich async functions.");
}

Here's some background:
My task needs to query a kerberos secured hbase cluster. It registers
kerberos configuration file and keytab file via distributed cache. For
performance consideration hbase query is done in async io function. When
async io meets distributed cache, it no longer works.

Thanks,
Yaming


[jira] [Created] (FLINK-33760) Group Window agg has different result when only consuming -D records while using or not using minibatch

2023-12-05 Thread xuyang (Jira)
xuyang created FLINK-33760:
--

 Summary: Group Window agg has different result when only consuming 
-D records while using or not using minibatch
 Key: FLINK-33760
 URL: https://issues.apache.org/jira/browse/FLINK-33760
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: xuyang


Add the test in AggregateITCase to re-produce this bug.

 
{code:java}
@Test
def test(): Unit = {
  val upsertSourceCurrencyData = List(
changelogRow("-D", 1.bigDecimal, "a"),
changelogRow("-D", 1.bigDecimal, "b"),
changelogRow("-D", 1.bigDecimal, "b")
  )

  val upsertSourceDataId = registerData(upsertSourceCurrencyData);
  tEnv.executeSql(s"""
 |CREATE TABLE T (
 | `a` DECIMAL(32, 8),
 | `d` STRING,
 | proctime as proctime()
 |) WITH (
 | 'connector' = 'values',
 | 'data-id' = '$upsertSourceDataId',
 | 'changelog-mode' = 'I,UA,UB,D',
 | 'failing-source' = 'true'
 |)
 |""".stripMargin)

  val sql =
"SELECT max(a), sum(a), min(a), TUMBLE_START(proctime, INTERVAL '0.005' 
SECOND), TUMBLE_END(proctime, INTERVAL '0.005' SECOND), d FROM T GROUP BY d, 
TUMBLE(proctime, INTERVAL '0.005' SECOND)"

  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
  env.execute()

  // Use the result precision/scale calculated for sum and don't override with 
the one calculated
  // for plus()/minus(), which results in loosing a decimal digit.
  val expected = 
List("6.41671935,65947.230719357070,609.0286740370369970")
  assertEquals(expected, sink.getRetractResults.sorted)
} {code}
When MiniBatch is ON, the result is `List()`.

 

When MiniBatch is OFF, the result is 
`List(null,-1.,null,2023-12-06T11:29:21.895,2023-12-06T11:29:21.900,a)`.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33759) flink parquet writer support write nested array or map type

2023-12-05 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-33759:
---

 Summary: flink parquet writer support write nested array or map 
type
 Key: FLINK-33759
 URL: https://issues.apache.org/jira/browse/FLINK-33759
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Cai Liuyang


When we use flink-parquet format wirte Map[] type (which will 
be read by spark job), we encounter an exception: 
`org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the 
field should be ommited completely instead`, after review the code, we found 
flink-parquet doesn't support write nested array or map, because 
[ArrayWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L437)]
 and 
[MapWriter]([https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java#L391)]
 doesn't impl `public void write(ArrayData arrayData, int ordinal) {}` function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-12-05 Thread Xia Sun
Dear developers,

FLIP-379: Dynamic source parallelism inference for batch jobs[1] has been
accepted and voted through this thread [2].

The proposal received 6 approving binding votes and there is no disapproval:

- Zhu Zhu (binding)
- Lijie Wang (binding)
- Rui Fan (binding)
- Etienne Chauchot (binding)
- Leonard Xu (binding)
- Jingsong Li (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
[2] https://lists.apache.org/thread/g03m2r8dodz6gn8jgf36mvq60h1tsnqg

Best,
Xia


Re: [jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Darin Amos
I apologize, I was a little off with my description, it's been a while
since I have looked at this code but I have refreshed myself.

The line I referred to earlier was correct though. This operator only
processes records in a file split while the operator is idle, meaning there
are no more incoming file splits. After every read it checks if there are
any incoming file splits before continuing to read from the split. If there
is indeed a new inbound file split, the loop will exit and it will re-queue
itself to continue processing records later. You can see that here

.

When the loop is interrupted by a checkpoint barrier, snapshotState(...) is
called and the reader grabs the state from the provided Format. In the
normal case the state is simply the split offset (current progress
indicator), in more complex scenarios you can create your own format class
and provide whatever serializable state you desire. In my case we store
additional metadata about the progress of the reader.

On state restore, the operator calls loadSplit and it will call *reopen* on
the format rather than open, passing the checkpoint state so you can
continue from where you left off . You can see that here

.

Cheers

Darin




On Tue, Dec 5, 2023 at 8:08 PM Darin Amos  wrote:

> They way I understand this loop is that the ContinuiousFileReaderOperator
> only processes records in the background while the operator is idle, i.e.
> while it's not receiving any records.
>
> At the very bottom of that loop here
> 
> it exits if the executor is no longer idle, i.e. there are incoming records.
>
> If you look here
> ,
> the operator supports checkpointable input splits, meaning it'll save it's
> place within a file split. This would only be possible if the reader can be
> interrupted in the middle of a split. I have written custom splits that do
> this exactly.
>
> Darin
>
> On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph 
> wrote:
>
>> This is the loop - code reference
>> <
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
>> >,
>> where it fetches all records from the split, and then only the
>> MailboxProcessor gets control to check other mail. This loop was
>> introduced
>> here
>> <
>> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
>> >
>> .
>>
>>
>>
>>
>> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos > .invalid>
>> wrote:
>>
>> > I thought for sure this was already the existing behavior with this
>> > operator. Does it not check the mailbox executor after every record
>> read?
>> >
>> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) 
>> > wrote:
>> >
>> > > Prabhu Joseph created FLINK-33753:
>> > > -
>> > >
>> > >  Summary: ContinuousFileReaderOperator consume records as
>> > mini
>> > > batch
>> > >  Key: FLINK-33753
>> > >  URL:
>> https://issues.apache.org/jira/browse/FLINK-33753
>> > >  Project: Flink
>> > >   Issue Type: Improvement
>> > > Affects Versions: 1.18.0
>> > > Reporter: Prabhu Joseph
>> > >
>> > >
>> > > The ContinuousFileReaderOperator reads and collects the records from a
>> > > split in a loop. If the split size is large, then the loop will take
>> more
>> > > time, and then the mailbox executor won't have a chance to process the
>> > > checkpoint barrier. This leads to checkpoint timing out.
>> > > ContinuousFileReaderOperator could be improved to consume the records
>> in
>> > a
>> > > mini batch, similar to Hudi's StreamReadOperator (
>> > > https://issues.apache.org/jira/browse/HUDI-2485).
>> > >
>> > >
>> > >
>> > > --
>> > > This message was sent by Atlassian Jira
>> > > (v8.20.10#820010)
>> > >
>> >
>>
>


Re: [jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Darin Amos
They way I understand this loop is that the ContinuiousFileReaderOperator
only processes records in the background while the operator is idle, i.e.
while it's not receiving any records.

At the very bottom of that loop here

it exits if the executor is no longer idle, i.e. there are incoming records.

If you look here
,
the operator supports checkpointable input splits, meaning it'll save it's
place within a file split. This would only be possible if the reader can be
interrupted in the middle of a split. I have written custom splits that do
this exactly.

Darin

On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph 
wrote:

> This is the loop - code reference
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
> >,
> where it fetches all records from the split, and then only the
> MailboxProcessor gets control to check other mail. This loop was introduced
> here
> <
> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
> >
> .
>
>
>
>
> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos  .invalid>
> wrote:
>
> > I thought for sure this was already the existing behavior with this
> > operator. Does it not check the mailbox executor after every record read?
> >
> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) 
> > wrote:
> >
> > > Prabhu Joseph created FLINK-33753:
> > > -
> > >
> > >  Summary: ContinuousFileReaderOperator consume records as
> > mini
> > > batch
> > >  Key: FLINK-33753
> > >  URL:
> https://issues.apache.org/jira/browse/FLINK-33753
> > >  Project: Flink
> > >   Issue Type: Improvement
> > > Affects Versions: 1.18.0
> > > Reporter: Prabhu Joseph
> > >
> > >
> > > The ContinuousFileReaderOperator reads and collects the records from a
> > > split in a loop. If the split size is large, then the loop will take
> more
> > > time, and then the mailbox executor won't have a chance to process the
> > > checkpoint barrier. This leads to checkpoint timing out.
> > > ContinuousFileReaderOperator could be improved to consume the records
> in
> > a
> > > mini batch, similar to Hudi's StreamReadOperator (
> > > https://issues.apache.org/jira/browse/HUDI-2485).
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian Jira
> > > (v8.20.10#820010)
> > >
> >
>


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-05 Thread Mason Chen
Hi Rui,

Sorry for the late reply. I was suggesting that perhaps we could do some
testing with Kubernetes wrt configuring values for the exponential restart
strategy. We've noticed that the default strategy in 1.17 caused a lot of
requests to the K8s API server for unstable deployments.

However, people in different Kubernetes setups will have different limits
so it would be challenging to provide a general benchmark. Another thing I
found helpful in the past is to refer to Kubernetes--for example, the
default strategy is exponential for pod restarts and we could draw
inspiration from what they have set as a general purpose default config.

Best,
Mason

On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi David and Mason,
>
> Thanks for your feedback!
>
> To David:
>
> > Given that the new default feels more complex than the current behavior,
> if we decide to do this I think it will be important to include the
> rationale you've shared in the documentation.
>
> Sounds make sense to me, I will add the related doc if we
> update the default strategy.
>
> To Mason:
>
> > I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs,
> > it seems most people are relying on Kubernetes to deploy Flink and the
> restart strategy has a large dependency on how well Kubernetes can scale to
> requests to redeploy the job.
>
> Sorry, I didn't understand what type of benchmarking
> we should do, could you elaborate on it? Thanks a lot.
>
> Best,
> Rui
>
> On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
>
>> Hi Rui,
>>
>> I suppose we could do some benchmarking on what works well for the
>> resource providers that Flink relies on e.g. Kubernetes. Based on
>> conferences and blogs, it seems most people are relying on Kubernetes to
>> deploy Flink and the restart strategy has a large dependency on how well
>> Kubernetes can scale to requests to redeploy the job.
>>
>> Best,
>> Mason
>>
>> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
>> wrote:
>>
>>> Rui,
>>>
>>> I don't have any direct experience with this topic, but given the
>>> motivation you shared, the proposal makes sense to me. Given that the new
>>> default feels more complex than the current behavior, if we decide to do
>>> this I think it will be important to include the rationale you've shared in
>>> the documentation.
>>>
>>> David
>>>
>>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>>
 Hi dear flink users and devs:

 FLIP-364[1] intends to make some improvements to restart-strategy
 and discuss updating some of the default values of exponential-delay,
 and whether exponential-delay can be used as the default
 restart-strategy.
 After discussing at dev mail list[2], we hope to collect more feedback
 from Flink users.

 # Why does the default restart-strategy need to be updated?

 If checkpointing is enabled, the default value is fixed-delay with
 Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
 the job will restart infinitely with high frequency when a job
 continues to fail.

 When the Kafka cluster fails, a large number of flink jobs will be
 restarted frequently. After the kafka cluster is recovered, a large
 number of high-frequency restarts of flink jobs may cause the
 kafka cluster to avalanche again.

 Considering the exponential-delay as the default strategy with
 a couple of reasons:

 - The exponential-delay can reduce the restart frequency when
   a job continues to fail.
 - It can restart a job quickly when a job fails occasionally.
 - The restart-strategy.exponential-delay.jitter-factor can avoid r
   estarting multiple jobs at the same time. It’s useful to prevent
   avalanches.

 # What are the current default values[4] of exponential-delay?

 restart-strategy.exponential-delay.initial-backoff : 1s
 restart-strategy.exponential-delay.backoff-multiplier : 2.0
 restart-strategy.exponential-delay.jitter-factor : 0.1
 restart-strategy.exponential-delay.max-backoff : 5 min
 restart-strategy.exponential-delay.reset-backoff-threshold : 1h

 backoff-multiplier=2 means that the delay time of each restart
 will be doubled. The delay times are:
 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.

 The delay time is increased rapidly, it will affect the recover
 time for flink jobs.

 # Option improvements

 We think the backoff-multiplier between 1 and 2 is more sensible,
 such as:

 restart-strategy.exponential-delay.backoff-multiplier : 1.2
 restart-strategy.exponential-delay.max-backoff : 1 min

 After updating, the delay times are:

 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 

[jira] [Created] (FLINK-33758) Implement restore tests for TemporalSort node

2023-12-05 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33758:
--

 Summary: Implement restore tests for TemporalSort node
 Key: FLINK-33758
 URL: https://issues.apache.org/jira/browse/FLINK-33758
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33757) Implement restore tests for Rank node

2023-12-05 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33757:
--

 Summary: Implement restore tests for Rank node
 Key: FLINK-33757
 URL: https://issues.apache.org/jira/browse/FLINK-33757
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes
Assignee: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-05 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33756:
--

 Summary: Missing record with CUMULATE/HOP windows using an 
optimization
 Key: FLINK-33756
 URL: https://issues.apache.org/jira/browse/FLINK-33756
 Project: Flink
  Issue Type: Bug
Reporter: Jim Hughes


I have seen an optimization cause a window fail to emit a record.

With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to true, 
the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or CUMULATE 
window with an offset, a record can be sent which causes one of the multiple 
active windows to fail to emit a record.

The link code modifies the `WindowAggregateJsonITCase` to demonstrate the case. 
 
 
The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
`testDistinctSplitEnabled` tests the above configurations and shows that one 
record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Resolve diamond inheritance of Sink.createWriter

2023-12-05 Thread Márton Balassi
Thanks, Peter. Given the discussion I also agree that the consensus is to
move towards the mixin interface approach (and accept its disadvantages
given its advantages).

+1 for the general direction of your proposed code change in
https://github.com/apache/flink/pull/23876.

On Tue, Dec 5, 2023 at 3:44 PM Péter Váry 
wrote:

> It seems to me we have a consensus to move forward with the mixin approach.
> I hope that everyone is aware that with the mixin interfaces we lose the
> opportunity of the strong type checks. This will be especially painful for
> generic types, where we will not have a way to ensure that the generic
> types are correctly synchronized between the different interfaces, even on
> DAG creation time.
>
> Even with this drawback, I like this approach too, so +1 from my side.
>
> As a first step in the direction of the mixin approach, we can remove the
> specific implementations of the `createWriter` methods from the
> `StatefulSink` and the `TwoPhaseCommitingSink` interfaces (and replace them
> with an instanceof check where needed).
> - This would remove the diamond inheritance - enable us to create default
> methods for backward compatibility.
> - This would not break the API, as the same method with wider return value
> will be inherited from the `Sink` interface.
>
> Since, it might be easier to understand the proposed changes, I have
> created a new PR: https://github.com/apache/flink/pull/23876
> The PR has 2 commits:
> - Reverting the previous change - non-clean, since there were some
> additional fixes on the tests -
>
> https://github.com/apache/flink/pull/23876/commits/c7625d5fa62a6e9a182f39f53fb7e5626105f3b0
> - The new change with mixin approach, and deprecation -
>
> https://github.com/apache/flink/pull/23876/commits/99ec936966af527598ca49712c1263bc4aa03c15
>
> Thanks,
> Peter
>
> weijie guo  ezt írta (időpont: 2023. dec. 5.,
> K,
> 8:01):
>
> > Thanks Martijn for driving this!
> >
> > I'm +1  to reverting the breaking change.
> >
> > > For new functionality or changes we can make easily, we should switch
> to
> > the decorative/mixin interface approach used successfully in the source
> and
> > table interfaces.
> >
> > I like the way of switching to mixin interface.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Becket Qin  于2023年12月5日周二 14:50写道:
> >
> > > I am with Gyula about fixing the current SinkV2 API.
> > >
> > > A SinkV3 seems not necessary because we are not changing the
> fundamental
> > > design of the API. Hopefully we can modify the interface structure a
> > little
> > > bit to make it similar to the Source while still keep the backwards
> > > compatibility.
> > > For example, one approach is:
> > >
> > > - Add snapshotState(int checkpointId) and precommit() methods to the
> > > SinkWriter with default implementation doing nothing. Deprecate
> > > StatefulSinkWriter and PrecommittingSinkWriter.
> > > - Add two mixin interfaces of SupportsStatefulWrite and
> > > SupportsTwoPhaseCommit. Deprecate the StatefulSink and
> > > TwoPhaseCommittingSink.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Dec 4, 2023 at 7:25 PM Gyula Fóra 
> wrote:
> > >
> > > > Hi All!
> > > >
> > > > Based on the discussion above, I feel that the most reasonable
> approach
> > > > from both developers and users perspective at this point is what
> Becket
> > > > lists as Option 1:
> > > >
> > > > Revert the naming change to the backward compatible version and
> accept
> > > that
> > > > the names are not perfect (treat it as legacy).
> > > >
> > > > On a different note, I agree that the current sink v2 interface is
> very
> > > > difficult to evolve and structuring the interfaces the way they are
> now
> > > is
> > > > not a good design in the long run.
> > > > For new functionality or changes we can make easily, we should switch
> > to
> > > > the decorative/mixin interface approach used successfully in the
> source
> > > and
> > > > table interfaces. Let's try to do this as much as possible within the
> > v2
> > > > and compatibility boundaries and we should only introduce a v3 if we
> > > really
> > > > must.
> > > >
> > > > So from my side, +1 to reverting the naming to keep backward
> > > compatibility.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > >
> > > > On Fri, Dec 1, 2023 at 10:43 AM Péter Váry <
> > peter.vary.apa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Becket for your reply!
> > > > >
> > > > > *On Option 1:*
> > > > > - I personally consider API inconsistencies more important, since
> > they
> > > > will
> > > > > remain with us "forever", but this is up to the community. I can
> > > > implement
> > > > > whichever solution we decide upon.
> > > > >
> > > > > *Option 2:*
> > > > > - I don't think this specific issue merits a rewrite, but if we
> > decide
> > > to
> > > > > change our approach, then it's a different story.
> > > > >
> > > > > *Evolvability:*
> > > > > This discussion reminds me of a similar discussion on FLIP-372 

Re: [jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Prabhu Joseph
This is the loop - code reference
,
where it fetches all records from the split, and then only the
MailboxProcessor gets control to check other mail. This loop was introduced
here

.




On Tue, Dec 5, 2023 at 9:00 PM Darin Amos 
wrote:

> I thought for sure this was already the existing behavior with this
> operator. Does it not check the mailbox executor after every record read?
>
> On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) 
> wrote:
>
> > Prabhu Joseph created FLINK-33753:
> > -
> >
> >  Summary: ContinuousFileReaderOperator consume records as
> mini
> > batch
> >  Key: FLINK-33753
> >  URL: https://issues.apache.org/jira/browse/FLINK-33753
> >  Project: Flink
> >   Issue Type: Improvement
> > Affects Versions: 1.18.0
> > Reporter: Prabhu Joseph
> >
> >
> > The ContinuousFileReaderOperator reads and collects the records from a
> > split in a loop. If the split size is large, then the loop will take more
> > time, and then the mailbox executor won't have a chance to process the
> > checkpoint barrier. This leads to checkpoint timing out.
> > ContinuousFileReaderOperator could be improved to consume the records in
> a
> > mini batch, similar to Hudi's StreamReadOperator (
> > https://issues.apache.org/jira/browse/HUDI-2485).
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.20.10#820010)
> >
>


Re: [DISCUSS] Release flink-connector-parent v1.01

2023-12-05 Thread Etienne Chauchot

Hi Péter,

My answers are inline


Best

Etienne


Le 05/12/2023 à 05:27, Péter Váry a écrit :

Hi Etienne,

Which branch would you cut the release from?

the parent_pom branch (consisting of a single maven pom file)


I find the flink-connector-parent branches confusing.

If I merge a PR to the ci_utils branch, would it immediately change the CI
workflow of all of the connectors?


The ci_utils branch is basically one ci.yml workflow. _testing.yml and 
maven test-project are both for testing the ci.yml workflow and display 
what it can do to connector authors.


As the connectors workflows refer ci.yml as this: 
apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils, 
if we merge changes to ci.yml all the CIs in the connectors' repo will 
change.




If I merge something to the release_utils branch, would it immediately
change the release process of all of the connectors?
I don't know how release-utils scripts are integrated with the 
connectors' code yet


I would like to add the possibility of creating Python packages for the
connectors [1]. This would consist of some common code, which should reside
in flink-connector-parent, like:
- scripts for running Python test - test infra. I expect that this would
evolve in time
- ci workflow - this would be more slow moving, but might change if the
infra is charging
- release scripts - this would be slow moving, but might change too.

I think we should have a release for all of the above components, so the
connectors could move forward on their own pace.



I think it is quite out of the scope of this release: here we are only 
talking about releasing a parent pom maven file for the connectors.




What do you think?

Thanks,
Péter

[1]https://issues.apache.org/jira/browse/FLINK-33528

On Thu, Nov 30, 2023, 16:55 Etienne Chauchot  wrote:


Thanks Sergey for your vote. Indeed I have listed only the PRs merged
since last release but there are these 2 open PRs that could be worth
reviewing/merging before release.

https://github.com/apache/flink-connector-shared-utils/pull/25

https://github.com/apache/flink-connector-shared-utils/pull/20

Best

Etienne


Le 30/11/2023 à 11:12, Sergey Nuyanzin a écrit :

thanks for volunteering Etienne

+1 for releasing
however there is one more PR to enable custom jvm flags for connectors
in similar way it is done in Flink main repo for modules
It will simplify a bit support for java 17

could we have this as well in the coming release?



On Wed, Nov 29, 2023 at 11:40 AM Etienne Chauchot
wrote:


Hi all,

I would like to discuss making a v1.0.1 release of

flink-connector-parent.

Since last release, there were only 2 changes:

-https://github.com/apache/flink-connector-shared-utils/pull/19
(spotless addition)

-https://github.com/apache/flink-connector-shared-utils/pull/26
(surefire configuration)

The new release would bring the ability to skip some tests in the
connectors and among other things skip the archunit tests. It is
important for connectors to skip archunit tests when tested against a
version of Flink that changes the archunit rules leading to a change of
the violation store. As there is only one violation store and the
connector needs to be tested against last 2 minor Flink versions, only
the version the connector was built against needs to run the archunit
tests and have them reflected in the violation store.


I volunteer to make the release. As it would be my first ASF release, I
might require the guidance of one of the PMC members.


Best

Etienne






Re: [jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Darin Amos
I thought for sure this was already the existing behavior with this
operator. Does it not check the mailbox executor after every record read?

On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira)  wrote:

> Prabhu Joseph created FLINK-33753:
> -
>
>  Summary: ContinuousFileReaderOperator consume records as mini
> batch
>  Key: FLINK-33753
>  URL: https://issues.apache.org/jira/browse/FLINK-33753
>  Project: Flink
>   Issue Type: Improvement
> Affects Versions: 1.18.0
> Reporter: Prabhu Joseph
>
>
> The ContinuousFileReaderOperator reads and collects the records from a
> split in a loop. If the split size is large, then the loop will take more
> time, and then the mailbox executor won't have a chance to process the
> checkpoint barrier. This leads to checkpoint timing out.
> ContinuousFileReaderOperator could be improved to consume the records in a
> mini batch, similar to Hudi's StreamReadOperator (
> https://issues.apache.org/jira/browse/HUDI-2485).
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)
>


Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-05 Thread Lijie Wang
Hi Paul,

I believe Xintong has answered your question.

>> IIUC, in the FLIP, the main method is lost after the recovery, and only
submitted jobs would be recovered. Is that right?

You are right, we can't recover the execution progress of main method. So
after JM crashs, only the submitted and in-completed jobs (as Xintong said,
completed jobs will not be re-run) will be recovered and continue to run.

Best,
Lijie

Xintong Song  于2023年12月5日周二 18:30写道:

> @Paul,
>
>
> Do you mean the scenario where users call `evn.execute()` multiple times in
> the `main()` method? I believe that is not supported currently when HA is
> enabled, for the exact same reason you mentioned that Flink is not aware of
> which jobs are executed and which are not.
>
>
> On the other hand, if an external scheduler is used to submit multiple jobs
> to a session cluster, Flink already has a JobResultStore for persisting
> information about successfully completed jobs, so that only in-completed
> jobs will be recovered. See FLIP-194[1] for more details.
>
>
> Best,
>
> Xintong
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Tue, Dec 5, 2023 at 6:01 PM Xintong Song  wrote:
>
> > Thanks for addressing my comments, Lijie. LGTM
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Tue, Dec 5, 2023 at 2:56 PM Paul Lam  wrote:
> >
> >> Hi Lijie,
> >>
> >> Recovery for batch jobs is no doubt a long-awaited feature. Thanks for
> >> the proposal!
> >>
> >> I’m concerned about the multi-job scenario. In session mode, users could
> >> use web submission to upload and run jars which may produce multiple
> >> Flink jobs. However, these jobs may not be submitted at once and run in
> >> parallel. Instead, they could be dependent on other jobs like a DAG. The
> >> schedule of the jobs is controlled by the user's main method.
> >>
> >> IIUC, in the FLIP, the main method is lost after the recovery, and only
> >> submitted jobs would be recovered. Is that right?
> >>
> >> Best,
> >> Paul Lam
> >>
> >> > 2023年11月2日 18:00,Lijie Wang  写道:
> >> >
> >> > Hi devs,
> >> >
> >> > Zhu Zhu and I would like to start a discussion about FLIP-383: Support
> >> Job
> >> > Recovery for Batch Jobs[1]
> >> >
> >> > Currently, when Flink’s job manager crashes or gets killed, possibly
> >> due to
> >> > unexpected errors or planned nodes decommission, it will cause the
> >> > following two situations:
> >> > 1. Failed, if the job does not enable HA.
> >> > 2. Restart, if the job enable HA. If it’s a streaming job, the job
> will
> >> be
> >> > resumed from the last successful checkpoint. If it’s a batch job, it
> >> has to
> >> > run from beginning, all previous progress will be lost.
> >> >
> >> > In view of this, we think the JM crash may cause great regression for
> >> batch
> >> > jobs, especially long running batch jobs. This FLIP is mainly to solve
> >> this
> >> > problem so that batch jobs can recover most job progress after JM
> >> crashes.
> >> > In this FLIP, our goal is to let most finished tasks not need to be
> >> re-run.
> >> >
> >> > You can find more details in the FLIP-383[1]. Looking forward to your
> >> > feedback.
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
> >> >
> >> > Best,
> >> > Lijie
> >>
> >>
>


Re: [DISCUSS] Resolve diamond inheritance of Sink.createWriter

2023-12-05 Thread Péter Váry
It seems to me we have a consensus to move forward with the mixin approach.
I hope that everyone is aware that with the mixin interfaces we lose the
opportunity of the strong type checks. This will be especially painful for
generic types, where we will not have a way to ensure that the generic
types are correctly synchronized between the different interfaces, even on
DAG creation time.

Even with this drawback, I like this approach too, so +1 from my side.

As a first step in the direction of the mixin approach, we can remove the
specific implementations of the `createWriter` methods from the
`StatefulSink` and the `TwoPhaseCommitingSink` interfaces (and replace them
with an instanceof check where needed).
- This would remove the diamond inheritance - enable us to create default
methods for backward compatibility.
- This would not break the API, as the same method with wider return value
will be inherited from the `Sink` interface.

Since, it might be easier to understand the proposed changes, I have
created a new PR: https://github.com/apache/flink/pull/23876
The PR has 2 commits:
- Reverting the previous change - non-clean, since there were some
additional fixes on the tests -
https://github.com/apache/flink/pull/23876/commits/c7625d5fa62a6e9a182f39f53fb7e5626105f3b0
- The new change with mixin approach, and deprecation -
https://github.com/apache/flink/pull/23876/commits/99ec936966af527598ca49712c1263bc4aa03c15

Thanks,
Peter

weijie guo  ezt írta (időpont: 2023. dec. 5., K,
8:01):

> Thanks Martijn for driving this!
>
> I'm +1  to reverting the breaking change.
>
> > For new functionality or changes we can make easily, we should switch to
> the decorative/mixin interface approach used successfully in the source and
> table interfaces.
>
> I like the way of switching to mixin interface.
>
> Best regards,
>
> Weijie
>
>
> Becket Qin  于2023年12月5日周二 14:50写道:
>
> > I am with Gyula about fixing the current SinkV2 API.
> >
> > A SinkV3 seems not necessary because we are not changing the fundamental
> > design of the API. Hopefully we can modify the interface structure a
> little
> > bit to make it similar to the Source while still keep the backwards
> > compatibility.
> > For example, one approach is:
> >
> > - Add snapshotState(int checkpointId) and precommit() methods to the
> > SinkWriter with default implementation doing nothing. Deprecate
> > StatefulSinkWriter and PrecommittingSinkWriter.
> > - Add two mixin interfaces of SupportsStatefulWrite and
> > SupportsTwoPhaseCommit. Deprecate the StatefulSink and
> > TwoPhaseCommittingSink.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Dec 4, 2023 at 7:25 PM Gyula Fóra  wrote:
> >
> > > Hi All!
> > >
> > > Based on the discussion above, I feel that the most reasonable approach
> > > from both developers and users perspective at this point is what Becket
> > > lists as Option 1:
> > >
> > > Revert the naming change to the backward compatible version and accept
> > that
> > > the names are not perfect (treat it as legacy).
> > >
> > > On a different note, I agree that the current sink v2 interface is very
> > > difficult to evolve and structuring the interfaces the way they are now
> > is
> > > not a good design in the long run.
> > > For new functionality or changes we can make easily, we should switch
> to
> > > the decorative/mixin interface approach used successfully in the source
> > and
> > > table interfaces. Let's try to do this as much as possible within the
> v2
> > > and compatibility boundaries and we should only introduce a v3 if we
> > really
> > > must.
> > >
> > > So from my side, +1 to reverting the naming to keep backward
> > compatibility.
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Fri, Dec 1, 2023 at 10:43 AM Péter Váry <
> peter.vary.apa...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Becket for your reply!
> > > >
> > > > *On Option 1:*
> > > > - I personally consider API inconsistencies more important, since
> they
> > > will
> > > > remain with us "forever", but this is up to the community. I can
> > > implement
> > > > whichever solution we decide upon.
> > > >
> > > > *Option 2:*
> > > > - I don't think this specific issue merits a rewrite, but if we
> decide
> > to
> > > > change our approach, then it's a different story.
> > > >
> > > > *Evolvability:*
> > > > This discussion reminds me of a similar discussion on FLIP-372 [1],
> > where
> > > > we are trying to decide if we should use mixin interfaces, or use
> > > interface
> > > > inheritance.
> > > > With the mixin approach, we have a more flexible interface, but we
> > can't
> > > > check the generic types of the interfaces/classes on compile time, or
> > > even
> > > > when we create the DAG. The first issue happens when we call the
> method
> > > and
> > > > fail.
> > > > The issue here is similar:
> > > > - *StatefulSink* needs a writer with a method to `*snapshotState*`
> > > > - *TwoPhaseCommittingSink* needs a writer with `*prepareCommit*`
> > > > - 

[jira] [Created] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33755:
---

 Summary: Cleanup usage of deprecated 
StreamExecutionEnvironment#generateSequence
 Key: FLINK-33755
 URL: https://issues.apache.org/jira/browse/FLINK-33755
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Thank you, Gyula, we are working on validate setting larger  
taskmanager.memory.jvm-overhead.fraction to ease this problem, and on the other 
side, we try to find a way in deployment path to ease this problem.

I agree with you proposal, may be I could find sometime to make a pr for 
FLINK-33548 .

Thank you for your time.

Richard Su

> 2023年12月5日 21:24,Gyula Fóra  写道:
> 
> I understand your problem but I think you are trying to find a solution in
> the wrong place.
> Have you tried setting taskmanager.memory.jvm-overhead.fraction ? That
> would reserve more memory from the total process memory for non-JVM use.
> 
> Gyula
> 
> On Tue, Dec 5, 2023 at 1:50 PM richard.su  wrote:
> 
>> Sorry, "To be clear, we need a container has memory larger than request,
>> and confirm this pod has Guarantee Qos." which need to be "To be clear, we
>> need a container has memory larger than process.size, and confirm this pod
>> has Guarantee Qos."
>> 
>> Thanks.
>> 
>> Richard Su
>> 
>> 
>>> 2023年12月5日 20:47,richard.su  写道:
>>> 
>>> Hi, Gyula, yes, this is a special case in our scenarios, sorry about
>> that it's hard to understand,  which we want to reserved some memory beyond
>> the jobmanager or task manager's process.To be clear, we need a container
>> has memory larger than request, and confirm this pod has Guarantee Qos.
>>> 
>>> This is because we encounter the glibc problem inside container with
>> flink job using Rcoksdb, which reserved memory will help to ease this
>> problem.
>>> 
>>> So I hope the container resources's request can be decoupling from flink
>> configuration.
>>> 
>>> From flink's current implementation, this could not be done.
>>> 
>>> Thanks.
>>> 
>>> Richard Su
>>> 
 2023年12月5日 20:28,Gyula Fóra  写道:
 
 Richard, I still don't understand why the current setup doesn't work for
 you. According to
 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
 :
 
 The process memory config (which is what we configure) translates
>> directly
 into the container request size. With the new proposal you can set the
 limit independently.
 
 What you write doesn't make sense to me:
 "user wants to define a flinkdeployment with jobmanager has 1G memory
 resources in container field but config jobmanager.memory.process.size
>> as
 850m"
 
 If you want to have a 1G container you set the memory request
 (process.size) in the spec simply  to 1G. Then you have 1G, there are
>> other
 configs on how this 1G will be split inside the container for various
 purposes but these are all covered in detail by the flink memory
>> configs.
 
 Cheers
 Gyula
 
 On Tue, Dec 5, 2023 at 1:06 PM richard.su 
>> wrote:
 
> I think the new configuration could be :
> 
> "kubernetes.taskmanager.memory.amount" and
> "kubernetes.jobmanager.memory.amout"
> 
> once we can calculate the limit-factor by the different of requests and
> limits.
> 
> when native mode, we no longer check the process.size as default
>> memory,
> but using this configuration for decoupling logic.
> 
> Thanks
> 
> Richard Su
> 
>> 2023年12月5日 19:22,richard.su  写道:
>> 
>> Hi, Gyula, from my opinion, this still will using flinkDeployment's
> resource filed to set jobManager.memory.process.size, and I have told
>> an
> uncovered case that:
>> 
>> When user wants to define a flinkdeployment with jobmanager has 1G
> memory resources in container field but config
> jobmanager.memory.process.size as 850m, which this solution only
>> improves
> user config and actually make sconfig more intuitive and easier but not
> make the container resource decoupling flink configuration.
>> 
>> So from my side, I think it need to add new configuration to support
> this proposal, and it need more discussion.
>> 
>> Thanks
>> Chaoran Su
>> 
>> 
>>> 2023年12月5日 18:28,Gyula Fóra  写道:
>>> 
>>> This is the proposal according to FLINK-33548:
>>> 
>>> spec:
>>> taskManager:
>>> resources:
>>>  requests:
>>>memory: "64Mi"
>>>cpu: "250m"
>>>  limits:
>>>memory: "128Mi"
>>>cpu: "500m"
>>> 
>>> I honestly think this is much more intuitive and easier than using
>> the
>>> podTemplate, which is very complex immediately.
>>> Please tell me what use-case/setup is not covered by this improved
>> spec.
>>> 
>>> Unless there is a big limitation here I am still -1 for modifying the
>>> podTemplate logic and +1 for continuing with FLINK-33548
>>> 
>>> Gyula
>>> 
>>> 
>>> 
>>> On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
>>> surendralilh...@gmail.com> wrote:
>>> 
 Hi Gyula,
 
 FLINK-33548 proposes adding a new resource 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Gyula Fóra
I understand your problem but I think you are trying to find a solution in
the wrong place.
Have you tried setting taskmanager.memory.jvm-overhead.fraction ? That
would reserve more memory from the total process memory for non-JVM use.

Gyula

On Tue, Dec 5, 2023 at 1:50 PM richard.su  wrote:

> Sorry, "To be clear, we need a container has memory larger than request,
> and confirm this pod has Guarantee Qos." which need to be "To be clear, we
> need a container has memory larger than process.size, and confirm this pod
> has Guarantee Qos."
>
> Thanks.
>
> Richard Su
>
>
> > 2023年12月5日 20:47,richard.su  写道:
> >
> > Hi, Gyula, yes, this is a special case in our scenarios, sorry about
> that it's hard to understand,  which we want to reserved some memory beyond
> the jobmanager or task manager's process.To be clear, we need a container
> has memory larger than request, and confirm this pod has Guarantee Qos.
> >
> > This is because we encounter the glibc problem inside container with
> flink job using Rcoksdb, which reserved memory will help to ease this
> problem.
> >
> > So I hope the container resources's request can be decoupling from flink
> configuration.
> >
> > From flink's current implementation, this could not be done.
> >
> > Thanks.
> >
> > Richard Su
> >
> >> 2023年12月5日 20:28,Gyula Fóra  写道:
> >>
> >> Richard, I still don't understand why the current setup doesn't work for
> >> you. According to
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
> >> :
> >>
> >> The process memory config (which is what we configure) translates
> directly
> >> into the container request size. With the new proposal you can set the
> >> limit independently.
> >>
> >> What you write doesn't make sense to me:
> >> "user wants to define a flinkdeployment with jobmanager has 1G memory
> >> resources in container field but config jobmanager.memory.process.size
> as
> >> 850m"
> >>
> >> If you want to have a 1G container you set the memory request
> >> (process.size) in the spec simply  to 1G. Then you have 1G, there are
> other
> >> configs on how this 1G will be split inside the container for various
> >> purposes but these are all covered in detail by the flink memory
> configs.
> >>
> >> Cheers
> >> Gyula
> >>
> >> On Tue, Dec 5, 2023 at 1:06 PM richard.su 
> wrote:
> >>
> >>> I think the new configuration could be :
> >>>
> >>> "kubernetes.taskmanager.memory.amount" and
> >>> "kubernetes.jobmanager.memory.amout"
> >>>
> >>> once we can calculate the limit-factor by the different of requests and
> >>> limits.
> >>>
> >>> when native mode, we no longer check the process.size as default
> memory,
> >>> but using this configuration for decoupling logic.
> >>>
> >>> Thanks
> >>>
> >>> Richard Su
> >>>
>  2023年12月5日 19:22,richard.su  写道:
> 
>  Hi, Gyula, from my opinion, this still will using flinkDeployment's
> >>> resource filed to set jobManager.memory.process.size, and I have told
> an
> >>> uncovered case that:
> 
>  When user wants to define a flinkdeployment with jobmanager has 1G
> >>> memory resources in container field but config
> >>> jobmanager.memory.process.size as 850m, which this solution only
> improves
> >>> user config and actually make sconfig more intuitive and easier but not
> >>> make the container resource decoupling flink configuration.
> 
>  So from my side, I think it need to add new configuration to support
> >>> this proposal, and it need more discussion.
> 
>  Thanks
>  Chaoran Su
> 
> 
> > 2023年12月5日 18:28,Gyula Fóra  写道:
> >
> > This is the proposal according to FLINK-33548:
> >
> > spec:
> > taskManager:
> > resources:
> >   requests:
> > memory: "64Mi"
> > cpu: "250m"
> >   limits:
> > memory: "128Mi"
> > cpu: "500m"
> >
> > I honestly think this is much more intuitive and easier than using
> the
> > podTemplate, which is very complex immediately.
> > Please tell me what use-case/setup is not covered by this improved
> spec.
> >
> > Unless there is a big limitation here I am still -1 for modifying the
> > podTemplate logic and +1 for continuing with FLINK-33548
> >
> > Gyula
> >
> >
> >
> > On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
> > surendralilh...@gmail.com> wrote:
> >
> >> Hi Gyula,
> >>
> >> FLINK-33548 proposes adding a new resource field to match with
> >>> Kubernetes
> >> pod resource configuration. Here's my suggestion: instead of adding
> a
> >>> new
> >> resource field, let's use a pod template for more advanced resource
> >>> setup.
> >> Adding a new resource field might confuse users. This change can
> also
> >>> help
> >> with issues when users use Flink Kubernetes commands directly,
> without
> >>> the
> >> operator.
> >>
> >> Thanks
> >> Surendra
> >>
> >>
> >> On Tue, Dec 5, 2023 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Martijn Visser
Hi Richard,

Shouldn't the solution then be solving the glibc problem?

Best regards,

Martijn

On Tue, Dec 5, 2023 at 1:49 PM richard.su  wrote:
>
> Sorry, "To be clear, we need a container has memory larger than request, and 
> confirm this pod has Guarantee Qos." which need to be "To be clear, we need a 
> container has memory larger than process.size, and confirm this pod has 
> Guarantee Qos."
>
> Thanks.
>
> Richard Su
>
>
> > 2023年12月5日 20:47,richard.su  写道:
> >
> > Hi, Gyula, yes, this is a special case in our scenarios, sorry about that 
> > it's hard to understand,  which we want to reserved some memory beyond the 
> > jobmanager or task manager's process.To be clear, we need a container has 
> > memory larger than request, and confirm this pod has Guarantee Qos.
> >
> > This is because we encounter the glibc problem inside container with flink 
> > job using Rcoksdb, which reserved memory will help to ease this problem.
> >
> > So I hope the container resources's request can be decoupling from flink 
> > configuration.
> >
> > From flink's current implementation, this could not be done.
> >
> > Thanks.
> >
> > Richard Su
> >
> >> 2023年12月5日 20:28,Gyula Fóra  写道:
> >>
> >> Richard, I still don't understand why the current setup doesn't work for
> >> you. According to
> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
> >> :
> >>
> >> The process memory config (which is what we configure) translates directly
> >> into the container request size. With the new proposal you can set the
> >> limit independently.
> >>
> >> What you write doesn't make sense to me:
> >> "user wants to define a flinkdeployment with jobmanager has 1G memory
> >> resources in container field but config jobmanager.memory.process.size as
> >> 850m"
> >>
> >> If you want to have a 1G container you set the memory request
> >> (process.size) in the spec simply  to 1G. Then you have 1G, there are other
> >> configs on how this 1G will be split inside the container for various
> >> purposes but these are all covered in detail by the flink memory configs.
> >>
> >> Cheers
> >> Gyula
> >>
> >> On Tue, Dec 5, 2023 at 1:06 PM richard.su  wrote:
> >>
> >>> I think the new configuration could be :
> >>>
> >>> "kubernetes.taskmanager.memory.amount" and
> >>> "kubernetes.jobmanager.memory.amout"
> >>>
> >>> once we can calculate the limit-factor by the different of requests and
> >>> limits.
> >>>
> >>> when native mode, we no longer check the process.size as default memory,
> >>> but using this configuration for decoupling logic.
> >>>
> >>> Thanks
> >>>
> >>> Richard Su
> >>>
>  2023年12月5日 19:22,richard.su  写道:
> 
>  Hi, Gyula, from my opinion, this still will using flinkDeployment's
> >>> resource filed to set jobManager.memory.process.size, and I have told an
> >>> uncovered case that:
> 
>  When user wants to define a flinkdeployment with jobmanager has 1G
> >>> memory resources in container field but config
> >>> jobmanager.memory.process.size as 850m, which this solution only improves
> >>> user config and actually make sconfig more intuitive and easier but not
> >>> make the container resource decoupling flink configuration.
> 
>  So from my side, I think it need to add new configuration to support
> >>> this proposal, and it need more discussion.
> 
>  Thanks
>  Chaoran Su
> 
> 
> > 2023年12月5日 18:28,Gyula Fóra  写道:
> >
> > This is the proposal according to FLINK-33548:
> >
> > spec:
> > taskManager:
> > resources:
> >   requests:
> > memory: "64Mi"
> > cpu: "250m"
> >   limits:
> > memory: "128Mi"
> > cpu: "500m"
> >
> > I honestly think this is much more intuitive and easier than using the
> > podTemplate, which is very complex immediately.
> > Please tell me what use-case/setup is not covered by this improved spec.
> >
> > Unless there is a big limitation here I am still -1 for modifying the
> > podTemplate logic and +1 for continuing with FLINK-33548
> >
> > Gyula
> >
> >
> >
> > On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
> > surendralilh...@gmail.com> wrote:
> >
> >> Hi Gyula,
> >>
> >> FLINK-33548 proposes adding a new resource field to match with
> >>> Kubernetes
> >> pod resource configuration. Here's my suggestion: instead of adding a
> >>> new
> >> resource field, let's use a pod template for more advanced resource
> >>> setup.
> >> Adding a new resource field might confuse users. This change can also
> >>> help
> >> with issues when users use Flink Kubernetes commands directly, without
> >>> the
> >> operator.
> >>
> >> Thanks
> >> Surendra
> >>
> >>
> >> On Tue, Dec 5, 2023 at 3:10 PM richard.su 
> >>> wrote:
> >>
> >>> Sorry Gyula,  let me explain more about the point of 2, if I avoid the
> >>> override, I will got a 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Sorry, "To be clear, we need a container has memory larger than request, and 
confirm this pod has Guarantee Qos." which need to be "To be clear, we need a 
container has memory larger than process.size, and confirm this pod has 
Guarantee Qos."

Thanks.

Richard Su


> 2023年12月5日 20:47,richard.su  写道:
> 
> Hi, Gyula, yes, this is a special case in our scenarios, sorry about that 
> it's hard to understand,  which we want to reserved some memory beyond the 
> jobmanager or task manager's process.To be clear, we need a container has 
> memory larger than request, and confirm this pod has Guarantee Qos.
> 
> This is because we encounter the glibc problem inside container with flink 
> job using Rcoksdb, which reserved memory will help to ease this problem.
> 
> So I hope the container resources's request can be decoupling from flink 
> configuration.
> 
> From flink's current implementation, this could not be done.
> 
> Thanks.
> 
> Richard Su
> 
>> 2023年12月5日 20:28,Gyula Fóra  写道:
>> 
>> Richard, I still don't understand why the current setup doesn't work for
>> you. According to
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
>> :
>> 
>> The process memory config (which is what we configure) translates directly
>> into the container request size. With the new proposal you can set the
>> limit independently.
>> 
>> What you write doesn't make sense to me:
>> "user wants to define a flinkdeployment with jobmanager has 1G memory
>> resources in container field but config jobmanager.memory.process.size as
>> 850m"
>> 
>> If you want to have a 1G container you set the memory request
>> (process.size) in the spec simply  to 1G. Then you have 1G, there are other
>> configs on how this 1G will be split inside the container for various
>> purposes but these are all covered in detail by the flink memory configs.
>> 
>> Cheers
>> Gyula
>> 
>> On Tue, Dec 5, 2023 at 1:06 PM richard.su  wrote:
>> 
>>> I think the new configuration could be :
>>> 
>>> "kubernetes.taskmanager.memory.amount" and
>>> "kubernetes.jobmanager.memory.amout"
>>> 
>>> once we can calculate the limit-factor by the different of requests and
>>> limits.
>>> 
>>> when native mode, we no longer check the process.size as default memory,
>>> but using this configuration for decoupling logic.
>>> 
>>> Thanks
>>> 
>>> Richard Su
>>> 
 2023年12月5日 19:22,richard.su  写道:
 
 Hi, Gyula, from my opinion, this still will using flinkDeployment's
>>> resource filed to set jobManager.memory.process.size, and I have told an
>>> uncovered case that:
 
 When user wants to define a flinkdeployment with jobmanager has 1G
>>> memory resources in container field but config
>>> jobmanager.memory.process.size as 850m, which this solution only improves
>>> user config and actually make sconfig more intuitive and easier but not
>>> make the container resource decoupling flink configuration.
 
 So from my side, I think it need to add new configuration to support
>>> this proposal, and it need more discussion.
 
 Thanks
 Chaoran Su
 
 
> 2023年12月5日 18:28,Gyula Fóra  写道:
> 
> This is the proposal according to FLINK-33548:
> 
> spec:
> taskManager:
> resources:
>   requests:
> memory: "64Mi"
> cpu: "250m"
>   limits:
> memory: "128Mi"
> cpu: "500m"
> 
> I honestly think this is much more intuitive and easier than using the
> podTemplate, which is very complex immediately.
> Please tell me what use-case/setup is not covered by this improved spec.
> 
> Unless there is a big limitation here I am still -1 for modifying the
> podTemplate logic and +1 for continuing with FLINK-33548
> 
> Gyula
> 
> 
> 
> On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
> surendralilh...@gmail.com> wrote:
> 
>> Hi Gyula,
>> 
>> FLINK-33548 proposes adding a new resource field to match with
>>> Kubernetes
>> pod resource configuration. Here's my suggestion: instead of adding a
>>> new
>> resource field, let's use a pod template for more advanced resource
>>> setup.
>> Adding a new resource field might confuse users. This change can also
>>> help
>> with issues when users use Flink Kubernetes commands directly, without
>>> the
>> operator.
>> 
>> Thanks
>> Surendra
>> 
>> 
>> On Tue, Dec 5, 2023 at 3:10 PM richard.su 
>>> wrote:
>> 
>>> Sorry Gyula,  let me explain more about the point of 2, if I avoid the
>>> override, I will got a jobmanager pod still with resources consist
>>> with
>>> “jobmanager.memory.process.size”, but a flinkdeployment with a
>>> resource
>>> larger than that.
>>> 
>>> Thanks for your time.
>>> Richard Su
>>> 
 2023年12月5日 17:13,richard.su  写道:
 
 Thank you for your time, Gyula, I have more question about
>>> Flink-33548,
>>> we can 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Hi, Gyula, yes, this is a special case in our scenarios, sorry about that it's 
hard to understand,  which we want to reserved some memory beyond the 
jobmanager or task manager's process.To be clear, we need a container has 
memory larger than request, and confirm this pod has Guarantee Qos.

This is because we encounter the glibc problem inside container with flink job 
using Rcoksdb, which reserved memory will help to ease this problem.

So I hope the container resources's request can be decoupling from flink 
configuration.

From flink's current implementation, this could not be done.

Thanks.

Richard Su

> 2023年12月5日 20:28,Gyula Fóra  写道:
> 
> Richard, I still don't understand why the current setup doesn't work for
> you. According to
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
> :
> 
> The process memory config (which is what we configure) translates directly
> into the container request size. With the new proposal you can set the
> limit independently.
> 
> What you write doesn't make sense to me:
> "user wants to define a flinkdeployment with jobmanager has 1G memory
> resources in container field but config jobmanager.memory.process.size as
> 850m"
> 
> If you want to have a 1G container you set the memory request
> (process.size) in the spec simply  to 1G. Then you have 1G, there are other
> configs on how this 1G will be split inside the container for various
> purposes but these are all covered in detail by the flink memory configs.
> 
> Cheers
> Gyula
> 
> On Tue, Dec 5, 2023 at 1:06 PM richard.su  wrote:
> 
>> I think the new configuration could be :
>> 
>> "kubernetes.taskmanager.memory.amount" and
>> "kubernetes.jobmanager.memory.amout"
>> 
>> once we can calculate the limit-factor by the different of requests and
>> limits.
>> 
>> when native mode, we no longer check the process.size as default memory,
>> but using this configuration for decoupling logic.
>> 
>> Thanks
>> 
>> Richard Su
>> 
>>> 2023年12月5日 19:22,richard.su  写道:
>>> 
>>> Hi, Gyula, from my opinion, this still will using flinkDeployment's
>> resource filed to set jobManager.memory.process.size, and I have told an
>> uncovered case that:
>>> 
>>> When user wants to define a flinkdeployment with jobmanager has 1G
>> memory resources in container field but config
>> jobmanager.memory.process.size as 850m, which this solution only improves
>> user config and actually make sconfig more intuitive and easier but not
>> make the container resource decoupling flink configuration.
>>> 
>>> So from my side, I think it need to add new configuration to support
>> this proposal, and it need more discussion.
>>> 
>>> Thanks
>>> Chaoran Su
>>> 
>>> 
 2023年12月5日 18:28,Gyula Fóra  写道:
 
 This is the proposal according to FLINK-33548:
 
 spec:
 taskManager:
  resources:
requests:
  memory: "64Mi"
  cpu: "250m"
limits:
  memory: "128Mi"
  cpu: "500m"
 
 I honestly think this is much more intuitive and easier than using the
 podTemplate, which is very complex immediately.
 Please tell me what use-case/setup is not covered by this improved spec.
 
 Unless there is a big limitation here I am still -1 for modifying the
 podTemplate logic and +1 for continuing with FLINK-33548
 
 Gyula
 
 
 
 On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
 surendralilh...@gmail.com> wrote:
 
> Hi Gyula,
> 
> FLINK-33548 proposes adding a new resource field to match with
>> Kubernetes
> pod resource configuration. Here's my suggestion: instead of adding a
>> new
> resource field, let's use a pod template for more advanced resource
>> setup.
> Adding a new resource field might confuse users. This change can also
>> help
> with issues when users use Flink Kubernetes commands directly, without
>> the
> operator.
> 
> Thanks
> Surendra
> 
> 
> On Tue, Dec 5, 2023 at 3:10 PM richard.su 
>> wrote:
> 
>> Sorry Gyula,  let me explain more about the point of 2, if I avoid the
>> override, I will got a jobmanager pod still with resources consist
>> with
>> “jobmanager.memory.process.size”, but a flinkdeployment with a
>> resource
>> larger than that.
>> 
>> Thanks for your time.
>> Richard Su
>> 
>>> 2023年12月5日 17:13,richard.su  写道:
>>> 
>>> Thank you for your time, Gyula, I have more question about
>> Flink-33548,
>> we can have more discussion about this and make progress:
>>> 
>>> 1. I agree with you about declaring resources in FlinkDeployment
>> resource sections. But Flink Operator will override the
>> “jobmanager.memory.process.size”  and
>> "taskmanager.memory.process.size",
>> despite I have set these configuration or not in flink configuration.
>> If
>> user had configured all memory attributes, the override will leads to
> error
>> as 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Gyula Fóra
Richard, I still don't understand why the current setup doesn't work for
you. According to
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup/
:

The process memory config (which is what we configure) translates directly
into the container request size. With the new proposal you can set the
limit independently.

What you write doesn't make sense to me:
"user wants to define a flinkdeployment with jobmanager has 1G memory
resources in container field but config jobmanager.memory.process.size as
850m"

If you want to have a 1G container you set the memory request
(process.size) in the spec simply  to 1G. Then you have 1G, there are other
configs on how this 1G will be split inside the container for various
purposes but these are all covered in detail by the flink memory configs.

Cheers
Gyula

On Tue, Dec 5, 2023 at 1:06 PM richard.su  wrote:

> I think the new configuration could be :
>
> "kubernetes.taskmanager.memory.amount" and
> "kubernetes.jobmanager.memory.amout"
>
> once we can calculate the limit-factor by the different of requests and
> limits.
>
> when native mode, we no longer check the process.size as default memory,
> but using this configuration for decoupling logic.
>
> Thanks
>
> Richard Su
>
> > 2023年12月5日 19:22,richard.su  写道:
> >
> > Hi, Gyula, from my opinion, this still will using flinkDeployment's
> resource filed to set jobManager.memory.process.size, and I have told an
> uncovered case that:
> >
> > When user wants to define a flinkdeployment with jobmanager has 1G
> memory resources in container field but config
> jobmanager.memory.process.size as 850m, which this solution only improves
> user config and actually make sconfig more intuitive and easier but not
> make the container resource decoupling flink configuration.
> >
> > So from my side, I think it need to add new configuration to support
> this proposal, and it need more discussion.
> >
> > Thanks
> > Chaoran Su
> >
> >
> >> 2023年12月5日 18:28,Gyula Fóra  写道:
> >>
> >> This is the proposal according to FLINK-33548:
> >>
> >> spec:
> >> taskManager:
> >>   resources:
> >> requests:
> >>   memory: "64Mi"
> >>   cpu: "250m"
> >> limits:
> >>   memory: "128Mi"
> >>   cpu: "500m"
> >>
> >> I honestly think this is much more intuitive and easier than using the
> >> podTemplate, which is very complex immediately.
> >> Please tell me what use-case/setup is not covered by this improved spec.
> >>
> >> Unless there is a big limitation here I am still -1 for modifying the
> >> podTemplate logic and +1 for continuing with FLINK-33548
> >>
> >> Gyula
> >>
> >>
> >>
> >> On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
> >> surendralilh...@gmail.com> wrote:
> >>
> >>> Hi Gyula,
> >>>
> >>> FLINK-33548 proposes adding a new resource field to match with
> Kubernetes
> >>> pod resource configuration. Here's my suggestion: instead of adding a
> new
> >>> resource field, let's use a pod template for more advanced resource
> setup.
> >>> Adding a new resource field might confuse users. This change can also
> help
> >>> with issues when users use Flink Kubernetes commands directly, without
> the
> >>> operator.
> >>>
> >>> Thanks
> >>> Surendra
> >>>
> >>>
> >>> On Tue, Dec 5, 2023 at 3:10 PM richard.su 
> wrote:
> >>>
>  Sorry Gyula,  let me explain more about the point of 2, if I avoid the
>  override, I will got a jobmanager pod still with resources consist
> with
>  “jobmanager.memory.process.size”, but a flinkdeployment with a
> resource
>  larger than that.
> 
>  Thanks for your time.
>  Richard Su
> 
> > 2023年12月5日 17:13,richard.su  写道:
> >
> > Thank you for your time, Gyula, I have more question about
> Flink-33548,
>  we can have more discussion about this and make progress:
> >
> > 1. I agree with you about declaring resources in FlinkDeployment
>  resource sections. But Flink Operator will override the
>  “jobmanager.memory.process.size”  and
> "taskmanager.memory.process.size",
>  despite I have set these configuration or not in flink configuration.
> If
>  user had configured all memory attributes, the override will leads to
> >>> error
>  as the overall computation is error.
> >
> > the code of override is in FlinkConfigManager.class in buildFrom
> >>> method,
>  which apply to JobmanagerSpec and TaskManagerSpec.
> >
> > 2. If I modified the code of override, I will still encounter this
> >>> issue
>  of FLINK-24150, because I only modified the code of flink operator but
> >>> not
>  flink-kubernetes package, so I will make a pod resources like (cpu:1c
>  memory:1g) and container resource to be (cpu:1c, memory 850m),
> because I
>  already set jobmanager.memory.process.size to 850m.
> >
> > 3. because of there two point, we need to make the podTemplate have
>  higher priority. Otherwise we can refactor the code of flink operator,
>  

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
I think the new configuration could be :

"kubernetes.taskmanager.memory.amount" and "kubernetes.jobmanager.memory.amout"

once we can calculate the limit-factor by the different of requests and limits.

when native mode, we no longer check the process.size as default memory, but 
using this configuration for decoupling logic.

Thanks

Richard Su

> 2023年12月5日 19:22,richard.su  写道:
> 
> Hi, Gyula, from my opinion, this still will using flinkDeployment's resource 
> filed to set jobManager.memory.process.size, and I have told an uncovered 
> case that:
> 
> When user wants to define a flinkdeployment with jobmanager has 1G memory 
> resources in container field but config jobmanager.memory.process.size as 
> 850m, which this solution only improves user config and actually make sconfig 
> more intuitive and easier but not make the container resource decoupling 
> flink configuration.
> 
> So from my side, I think it need to add new configuration to support this 
> proposal, and it need more discussion.
> 
> Thanks
> Chaoran Su
> 
> 
>> 2023年12月5日 18:28,Gyula Fóra  写道:
>> 
>> This is the proposal according to FLINK-33548:
>> 
>> spec:
>> taskManager:
>>   resources:
>> requests:
>>   memory: "64Mi"
>>   cpu: "250m"
>> limits:
>>   memory: "128Mi"
>>   cpu: "500m"
>> 
>> I honestly think this is much more intuitive and easier than using the
>> podTemplate, which is very complex immediately.
>> Please tell me what use-case/setup is not covered by this improved spec.
>> 
>> Unless there is a big limitation here I am still -1 for modifying the
>> podTemplate logic and +1 for continuing with FLINK-33548
>> 
>> Gyula
>> 
>> 
>> 
>> On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
>> surendralilh...@gmail.com> wrote:
>> 
>>> Hi Gyula,
>>> 
>>> FLINK-33548 proposes adding a new resource field to match with Kubernetes
>>> pod resource configuration. Here's my suggestion: instead of adding a new
>>> resource field, let's use a pod template for more advanced resource setup.
>>> Adding a new resource field might confuse users. This change can also help
>>> with issues when users use Flink Kubernetes commands directly, without the
>>> operator.
>>> 
>>> Thanks
>>> Surendra
>>> 
>>> 
>>> On Tue, Dec 5, 2023 at 3:10 PM richard.su  wrote:
>>> 
 Sorry Gyula,  let me explain more about the point of 2, if I avoid the
 override, I will got a jobmanager pod still with resources consist with
 “jobmanager.memory.process.size”, but a flinkdeployment with a resource
 larger than that.
 
 Thanks for your time.
 Richard Su
 
> 2023年12月5日 17:13,richard.su  写道:
> 
> Thank you for your time, Gyula, I have more question about Flink-33548,
 we can have more discussion about this and make progress:
> 
> 1. I agree with you about declaring resources in FlinkDeployment
 resource sections. But Flink Operator will override the
 “jobmanager.memory.process.size”  and "taskmanager.memory.process.size",
 despite I have set these configuration or not in flink configuration. If
 user had configured all memory attributes, the override will leads to
>>> error
 as the overall computation is error.
> 
> the code of override is in FlinkConfigManager.class in buildFrom
>>> method,
 which apply to JobmanagerSpec and TaskManagerSpec.
> 
> 2. If I modified the code of override, I will still encounter this
>>> issue
 of FLINK-24150, because I only modified the code of flink operator but
>>> not
 flink-kubernetes package, so I will make a pod resources like (cpu:1c
 memory:1g) and container resource to be (cpu:1c, memory 850m), because I
 already set jobmanager.memory.process.size to 850m.
> 
> 3. because of there two point, we need to make the podTemplate have
 higher priority. Otherwise we can refactor the code of flink operator,
 which should import something new configuration to support the native
>>> mode.
> 
> I think it will be better to import some configuration, which
 FlinkConfigManager.class can override it using the resource of
 JobmanagerSpec and TaskManagerSpec.
> 
> When it deep into the code flink-kubernetes package, we using these new
 configuration as the final result of containers resources.
> 
> Thanks for your time.
> Richard Su
> 
>> 2023年12月5日 16:45,Gyula Fóra  写道:
>> 
>> As you can see in the jira ticket there hasn't been any progress,
>>> nobody
>> started to work on this yet.
>> 
>> I personally don't think it's confusing to declare resources in the
>> FlinkDeployment resource sections. It's well documented and worked
>>> very
>> well so far for most users.
>> This is pretty common practice for kubernetes.
>> 
>> Cheers,
>> Gyula
>> 
>> On Tue, Dec 5, 2023 at 9:35 AM richard.su 
 wrote:
>> 
>>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to
 

Re: Re: [DISCUSS] Proposing an LTS Release for the 1.x Line

2023-12-05 Thread Alexander Fedulov
Hi Julian,

Could you please remove the duplicated "RE:" in the topic of the reply?
That way we can continue this discussion to the original thread.
(Apache deals with it correctly, but not all email clients/services do,
e.g. GMail)

Thanks,
Alex

On Tue, 5 Dec 2023 at 09:39, Payne, Julian 
wrote:

> Hey all,
> Thanks for this proposal, I think it makes a lot of sense. I am, curious
> to know what time horizon we would consider for LTS of 1.x. Customers value
> knowing when versions will deprecate so they can build migration into their
> planning and resourcing cycles, so I would be in favour of being
> transparent on how long the community will support 1.x.
>
> Thanks
>
>
> Julian
>
> On 2023/07/26 14:16:43 Konstantin Knauf wrote:
> > Hi Jing,
> >
> > > How could we help users and avoid this happening?
> >
> > I don't think we will be able to avoid this in all cases. And I think
> > that's ok. Its always a trade-off between supporting new use cases and
> > moving the project forward and backwards compatibility (in a broad
> sense).
> > For example, we dropped Mesos support in a minor release in the past. If
> > you're only option for running Flink was Mesos, you were stuck on Flink
> > 1.13 or so.
> >
> > So, I think, it is in the end a case-by-case decision. How big is the
> cost
> > of continued support a "legacy feature/system" and how many users are
> > affected to which degree by dropping it?
> >
> > Best,
> >
> > Konstantin
> >
> >
> > Am Di., 25. Juli 2023 um 18:34 Uhr schrieb Jing Ge
> > :
> >
> > > Hi Konstantin,
> > >
> > > I might have not made myself clear enough, apologies. The
> > > source-/sink-function was used as a concrete example to discuss the
> pattern
> > > before we decided to offer LTS. The intention was not to hijack this
> thread
> > > to discuss how to deprecate them.
> > >
> > > We all wish that the only thing users need to migrate from Flink 1.x
> to 2.0
> > > is some code changes in their repos and we all wish users will
> migrate, if
> > > LTS has long enough support time. But the question I tried to discuss
> is
> > > not the wish but the "How?". We might be able to toss the high
> migration
> > > effort aside(we shouldn't), since it is theoretically still doable if
> users
> > > have long enough time, even if the effort is extremely high. Another
> > > concern is that if "function regressions" is allowed in 2.0, i.e. if
> 2.0
> > > has a lack of functionalities or bugs compared to 1.x, there will be
> no way
> > > for users to do the migration regardless of whether we encourage them
> to
> > > migrate or they haven been given enough time(how long is enough?)
> because
> > > LTS has been offered. How could we help users and avoid this happening?
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jul 25, 2023 at 6:57 PM Konstantin Knauf 
> > > wrote:
> > >
> > > > Hi Jing,
> > > >
> > > > let's not overindex on the Source-/SinkFunction discussion in this
> > > thread.
> > > >
> > > > We will generally drop/break a lot of APIs in Flink 2.0. So,
> naturally
> > > > users will need to make more changes to their code in order to
> migrate
> > > from
> > > > 1.x to Flink 2.0. In order to give them more time to do this, we
> support
> > > > the last Flink 1.x release for a longer time with bug fix releases.
> > > >
> > > > Of course, we still encourage users to migrate to Flink 2.0, because
> at
> > > > some point, we will stop support Flink 1.x. For example, if we
> followed
> > > > Marton's proposal we would support Flink 1.x LTS for about 2 years
> > > (roughly
> > > > 4 minor release cycles) instead of about 1 year (2 minor release
> cycles)
> > > > for regular minor releases. This seems like a reasonable timeframe
> to me.
> > > > It also gives us more time to discover and address blockers in
> migrating
> > > to
> > > > Flink 2.x that we are not aware of right now.
> > > >
> > > > Best,
> > > >
> > > > Konstantin
> > > >
> > > > Am Di., 25. Juli 2023 um 12:48 Uhr schrieb Jing Ge
> > > > :
> > > >
> > > > > Hi all,
> > > > >
> > > > > Overall, it is a good idea to provide the LTS release, but I'd
> like to
> > > > > reference a concrete case as an example to understand what
> restrictions
> > > > the
> > > > > LTS should have.
> > > > >
> > > > > Hypothetically, Source-/Sink- Function have been deprecated in 1.x
> LTS
> > > > and
> > > > > removed in 2.0 and the issues[1] are not solved in 2.0. This is a
> > > typical
> > > > > scenario that the old APIs are widely used in 1.x LTS and the new
> APIs
> > > in
> > > > > 2.0 are not ready yet to take over all users. We will have the
> > > following
> > > > > questions:
> > > > >
> > > > > 1. Is this scenario allowed at all? Do we all agree that there
> could be
> > > > > some features/functionalities that only work in 1.x LTS after 2.0
> has
> > > > been
> > > > > released?
> > > > > 2. How long are we going to support 1.x LTS? 1 year? 2 years? As
> long
> > > as
> > > > > the issues that block users from migrating to 2.0 

[jira] [Created] (FLINK-33754) Serialize QueryOperations into SQL

2023-12-05 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-33754:


 Summary: Serialize QueryOperations into SQL
 Key: FLINK-33754
 URL: https://issues.apache.org/jira/browse/FLINK-33754
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.

2023-12-05 Thread Rui Fan (Jira)
Rui Fan created FLINK-33752:
---

 Summary: When Duration is greater than or equal to 1 day, the 
display unit is ms.
 Key: FLINK-33752
 URL: https://issues.apache.org/jira/browse/FLINK-33752
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.18.0
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.19.0, 1.18.1
 Attachments: image-2023-12-05-19-44-17-161.png

When the default value of Duration is 24 hours or 1 day, the display unit is 
ms. (8640 ms).

 

For example, the kubernetes operator doc has 3 options, their default value are 
8640 ms.

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

 

!image-2023-12-05-19-44-17-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-33753:
-

 Summary: ContinuousFileReaderOperator consume records as mini batch
 Key: FLINK-33753
 URL: https://issues.apache.org/jira/browse/FLINK-33753
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.18.0
Reporter: Prabhu Joseph


The ContinuousFileReaderOperator reads and collects the records from a split in 
a loop. If the split size is large, then the loop will take more time, and then 
the mailbox executor won't have a chance to process the checkpoint barrier. 
This leads to checkpoint timing out. ContinuousFileReaderOperator could be 
improved to consume the records in a mini batch, similar to Hudi's 
StreamReadOperator (https://issues.apache.org/jira/browse/HUDI-2485).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33751) use modules correctly when deserializing json plan

2023-12-05 Thread shuaiqi.guo (Jira)
shuaiqi.guo created FLINK-33751:
---

 Summary: use modules correctly when deserializing json plan
 Key: FLINK-33751
 URL: https://issues.apache.org/jira/browse/FLINK-33751
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: shuaiqi.guo


when serializing and deserializing SQL job by the following SQL Syntax:
{code:java}
COMPILE PLAN ...;
EXECUTE PLAN ...;{code}
if there are two modules in the environment, some bugs appeard when calling 
lookupOptionalSqlOperator():
 # if 2 Operators were found, it will return empty;
 # foundOperators is not ordered by modules order.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Hi, Gyula, from my opinion, this still will using flinkDeployment's resource 
filed to set jobManager.memory.process.size, and I have told an uncovered case 
that:

When user wants to define a flinkdeployment with jobmanager has 1G memory 
resources in container field but config jobmanager.memory.process.size as 850m, 
which this solution only improves user config and actually make sconfig more 
intuitive and easier but not make the container resource decoupling flink 
configuration.

So from my side, I think it need to add new configuration to support this 
proposal, and it need more discussion.

Thanks
Chaoran Su


> 2023年12月5日 18:28,Gyula Fóra  写道:
> 
> This is the proposal according to FLINK-33548:
> 
> spec:
>  taskManager:
>resources:
>  requests:
>memory: "64Mi"
>cpu: "250m"
>  limits:
>memory: "128Mi"
>cpu: "500m"
> 
> I honestly think this is much more intuitive and easier than using the
> podTemplate, which is very complex immediately.
> Please tell me what use-case/setup is not covered by this improved spec.
> 
> Unless there is a big limitation here I am still -1 for modifying the
> podTemplate logic and +1 for continuing with FLINK-33548
> 
> Gyula
> 
> 
> 
> On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
> surendralilh...@gmail.com> wrote:
> 
>> Hi Gyula,
>> 
>> FLINK-33548 proposes adding a new resource field to match with Kubernetes
>> pod resource configuration. Here's my suggestion: instead of adding a new
>> resource field, let's use a pod template for more advanced resource setup.
>> Adding a new resource field might confuse users. This change can also help
>> with issues when users use Flink Kubernetes commands directly, without the
>> operator.
>> 
>> Thanks
>> Surendra
>> 
>> 
>> On Tue, Dec 5, 2023 at 3:10 PM richard.su  wrote:
>> 
>>> Sorry Gyula,  let me explain more about the point of 2, if I avoid the
>>> override, I will got a jobmanager pod still with resources consist with
>>> “jobmanager.memory.process.size”, but a flinkdeployment with a resource
>>> larger than that.
>>> 
>>> Thanks for your time.
>>> Richard Su
>>> 
 2023年12月5日 17:13,richard.su  写道:
 
 Thank you for your time, Gyula, I have more question about Flink-33548,
>>> we can have more discussion about this and make progress:
 
 1. I agree with you about declaring resources in FlinkDeployment
>>> resource sections. But Flink Operator will override the
>>> “jobmanager.memory.process.size”  and "taskmanager.memory.process.size",
>>> despite I have set these configuration or not in flink configuration. If
>>> user had configured all memory attributes, the override will leads to
>> error
>>> as the overall computation is error.
 
 the code of override is in FlinkConfigManager.class in buildFrom
>> method,
>>> which apply to JobmanagerSpec and TaskManagerSpec.
 
 2. If I modified the code of override, I will still encounter this
>> issue
>>> of FLINK-24150, because I only modified the code of flink operator but
>> not
>>> flink-kubernetes package, so I will make a pod resources like (cpu:1c
>>> memory:1g) and container resource to be (cpu:1c, memory 850m), because I
>>> already set jobmanager.memory.process.size to 850m.
 
 3. because of there two point, we need to make the podTemplate have
>>> higher priority. Otherwise we can refactor the code of flink operator,
>>> which should import something new configuration to support the native
>> mode.
 
 I think it will be better to import some configuration, which
>>> FlinkConfigManager.class can override it using the resource of
>>> JobmanagerSpec and TaskManagerSpec.
 
 When it deep into the code flink-kubernetes package, we using these new
>>> configuration as the final result of containers resources.
 
 Thanks for your time.
 Richard Su
 
> 2023年12月5日 16:45,Gyula Fóra  写道:
> 
> As you can see in the jira ticket there hasn't been any progress,
>> nobody
> started to work on this yet.
> 
> I personally don't think it's confusing to declare resources in the
> FlinkDeployment resource sections. It's well documented and worked
>> very
> well so far for most users.
> This is pretty common practice for kubernetes.
> 
> Cheers,
> Gyula
> 
> On Tue, Dec 5, 2023 at 9:35 AM richard.su 
>>> wrote:
> 
>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to
>>> join
>> the discussion but I haven't seen any discussion in the url.
>> 
>> I also make flinkdeployment by flink operator, which indeed will
>>> override
>> the process size by TaskmanagerSpec.resources or
>>> JobmanagerSpec.resources,
>> which really confused, I had modified the code of flink operator to
>>> avoid
>> the override.
>> 
>> Looking for your response.
>> 
>> Thank you.
>> Richard Su
>> 
>> 
>>> 2023年12月5日 16:22,Gyula Fóra  写道:
>>> 
>>> Hi!

[FLINK-32028] connectors/elasticsearch: error handling

2023-12-05 Thread Peter Fischer
Hi!

We're using flink and its elasticsearch (ES) sink to process content
changes at wikimedia. The connector uses ES' bulk API but is rather strict
when it comes to interpreting a response: It fails if a single action of
the bulk has failed. This behavior is not configurable at the moment so I
would like to propose an improvement, to overcome this limitation:

Ticket: https://issues.apache.org/jira/browse/FLINK-32028
Pull-Request:
https://github.com/apache/flink-connector-elasticsearch/pull/83

I'm looking forward to getting feedback on this proposal.

Peter


Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-05 Thread Xintong Song
@Paul,


Do you mean the scenario where users call `evn.execute()` multiple times in
the `main()` method? I believe that is not supported currently when HA is
enabled, for the exact same reason you mentioned that Flink is not aware of
which jobs are executed and which are not.


On the other hand, if an external scheduler is used to submit multiple jobs
to a session cluster, Flink already has a JobResultStore for persisting
information about successfully completed jobs, so that only in-completed
jobs will be recovered. See FLIP-194[1] for more details.


Best,

Xintong


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore

On Tue, Dec 5, 2023 at 6:01 PM Xintong Song  wrote:

> Thanks for addressing my comments, Lijie. LGTM
>
> Best,
>
> Xintong
>
>
>
> On Tue, Dec 5, 2023 at 2:56 PM Paul Lam  wrote:
>
>> Hi Lijie,
>>
>> Recovery for batch jobs is no doubt a long-awaited feature. Thanks for
>> the proposal!
>>
>> I’m concerned about the multi-job scenario. In session mode, users could
>> use web submission to upload and run jars which may produce multiple
>> Flink jobs. However, these jobs may not be submitted at once and run in
>> parallel. Instead, they could be dependent on other jobs like a DAG. The
>> schedule of the jobs is controlled by the user's main method.
>>
>> IIUC, in the FLIP, the main method is lost after the recovery, and only
>> submitted jobs would be recovered. Is that right?
>>
>> Best,
>> Paul Lam
>>
>> > 2023年11月2日 18:00,Lijie Wang  写道:
>> >
>> > Hi devs,
>> >
>> > Zhu Zhu and I would like to start a discussion about FLIP-383: Support
>> Job
>> > Recovery for Batch Jobs[1]
>> >
>> > Currently, when Flink’s job manager crashes or gets killed, possibly
>> due to
>> > unexpected errors or planned nodes decommission, it will cause the
>> > following two situations:
>> > 1. Failed, if the job does not enable HA.
>> > 2. Restart, if the job enable HA. If it’s a streaming job, the job will
>> be
>> > resumed from the last successful checkpoint. If it’s a batch job, it
>> has to
>> > run from beginning, all previous progress will be lost.
>> >
>> > In view of this, we think the JM crash may cause great regression for
>> batch
>> > jobs, especially long running batch jobs. This FLIP is mainly to solve
>> this
>> > problem so that batch jobs can recover most job progress after JM
>> crashes.
>> > In this FLIP, our goal is to let most finished tasks not need to be
>> re-run.
>> >
>> > You can find more details in the FLIP-383[1]. Looking forward to your
>> > feedback.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
>> >
>> > Best,
>> > Lijie
>>
>>


Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Gyula Fóra
This is the proposal according to FLINK-33548:

spec:
  taskManager:
resources:
  requests:
memory: "64Mi"
cpu: "250m"
  limits:
memory: "128Mi"
cpu: "500m"

I honestly think this is much more intuitive and easier than using the
podTemplate, which is very complex immediately.
Please tell me what use-case/setup is not covered by this improved spec.

Unless there is a big limitation here I am still -1 for modifying the
podTemplate logic and +1 for continuing with FLINK-33548

Gyula



On Tue, Dec 5, 2023 at 11:16 AM Surendra Singh Lilhore <
surendralilh...@gmail.com> wrote:

> Hi Gyula,
>
> FLINK-33548 proposes adding a new resource field to match with Kubernetes
> pod resource configuration. Here's my suggestion: instead of adding a new
> resource field, let's use a pod template for more advanced resource setup.
> Adding a new resource field might confuse users. This change can also help
> with issues when users use Flink Kubernetes commands directly, without the
> operator.
>
> Thanks
> Surendra
>
>
> On Tue, Dec 5, 2023 at 3:10 PM richard.su  wrote:
>
> > Sorry Gyula,  let me explain more about the point of 2, if I avoid the
> > override, I will got a jobmanager pod still with resources consist with
> > “jobmanager.memory.process.size”, but a flinkdeployment with a resource
> > larger than that.
> >
> > Thanks for your time.
> > Richard Su
> >
> > > 2023年12月5日 17:13,richard.su  写道:
> > >
> > > Thank you for your time, Gyula, I have more question about Flink-33548,
> > we can have more discussion about this and make progress:
> > >
> > > 1. I agree with you about declaring resources in FlinkDeployment
> > resource sections. But Flink Operator will override the
> > “jobmanager.memory.process.size”  and "taskmanager.memory.process.size",
> > despite I have set these configuration or not in flink configuration. If
> > user had configured all memory attributes, the override will leads to
> error
> > as the overall computation is error.
> > >
> > > the code of override is in FlinkConfigManager.class in buildFrom
> method,
> > which apply to JobmanagerSpec and TaskManagerSpec.
> > >
> > > 2. If I modified the code of override, I will still encounter this
> issue
> > of FLINK-24150, because I only modified the code of flink operator but
> not
> > flink-kubernetes package, so I will make a pod resources like (cpu:1c
> > memory:1g) and container resource to be (cpu:1c, memory 850m), because I
> > already set jobmanager.memory.process.size to 850m.
> > >
> > > 3. because of there two point, we need to make the podTemplate have
> > higher priority. Otherwise we can refactor the code of flink operator,
> > which should import something new configuration to support the native
> mode.
> > >
> > > I think it will be better to import some configuration, which
> > FlinkConfigManager.class can override it using the resource of
> > JobmanagerSpec and TaskManagerSpec.
> > >
> > > When it deep into the code flink-kubernetes package, we using these new
> > configuration as the final result of containers resources.
> > >
> > > Thanks for your time.
> > > Richard Su
> > >
> > >> 2023年12月5日 16:45,Gyula Fóra  写道:
> > >>
> > >> As you can see in the jira ticket there hasn't been any progress,
> nobody
> > >> started to work on this yet.
> > >>
> > >> I personally don't think it's confusing to declare resources in the
> > >> FlinkDeployment resource sections. It's well documented and worked
> very
> > >> well so far for most users.
> > >> This is pretty common practice for kubernetes.
> > >>
> > >> Cheers,
> > >> Gyula
> > >>
> > >> On Tue, Dec 5, 2023 at 9:35 AM richard.su 
> > wrote:
> > >>
> > >>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to
> > join
> > >>> the discussion but I haven't seen any discussion in the url.
> > >>>
> > >>> I also make flinkdeployment by flink operator, which indeed will
> > override
> > >>> the process size by TaskmanagerSpec.resources or
> > JobmanagerSpec.resources,
> > >>> which really confused, I had modified the code of flink operator to
> > avoid
> > >>> the override.
> > >>>
> > >>> Looking for your response.
> > >>>
> > >>> Thank you.
> > >>> Richard Su
> > >>>
> > >>>
> >  2023年12月5日 16:22,Gyula Fóra  写道:
> > 
> >  Hi!
> > 
> >  Please see the discussion in
> >  https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
> >  and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
> > 
> >  We should follow the approach outlined there. If you are interested
> > you
> > >>> are
> >  welcome to pick up the operator ticket.
> > 
> >  Unfortunately your PR can be a large unexpected change to existing
> > users
> > >>> so
> >  we should not add it.
> > 
> >  Cheers,
> >  Gyula
> > 
> >  On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
> > 
> > > Hello everyone,
> > >
> > > I've encountered an issue while using flink kubernetes native,
> 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Surendra Singh Lilhore
Hi Gyula,

FLINK-33548 proposes adding a new resource field to match with Kubernetes
pod resource configuration. Here's my suggestion: instead of adding a new
resource field, let's use a pod template for more advanced resource setup.
Adding a new resource field might confuse users. This change can also help
with issues when users use Flink Kubernetes commands directly, without the
operator.

Thanks
Surendra


On Tue, Dec 5, 2023 at 3:10 PM richard.su  wrote:

> Sorry Gyula,  let me explain more about the point of 2, if I avoid the
> override, I will got a jobmanager pod still with resources consist with
> “jobmanager.memory.process.size”, but a flinkdeployment with a resource
> larger than that.
>
> Thanks for your time.
> Richard Su
>
> > 2023年12月5日 17:13,richard.su  写道:
> >
> > Thank you for your time, Gyula, I have more question about Flink-33548,
> we can have more discussion about this and make progress:
> >
> > 1. I agree with you about declaring resources in FlinkDeployment
> resource sections. But Flink Operator will override the
> “jobmanager.memory.process.size”  and "taskmanager.memory.process.size",
> despite I have set these configuration or not in flink configuration. If
> user had configured all memory attributes, the override will leads to error
> as the overall computation is error.
> >
> > the code of override is in FlinkConfigManager.class in buildFrom method,
> which apply to JobmanagerSpec and TaskManagerSpec.
> >
> > 2. If I modified the code of override, I will still encounter this issue
> of FLINK-24150, because I only modified the code of flink operator but not
> flink-kubernetes package, so I will make a pod resources like (cpu:1c
> memory:1g) and container resource to be (cpu:1c, memory 850m), because I
> already set jobmanager.memory.process.size to 850m.
> >
> > 3. because of there two point, we need to make the podTemplate have
> higher priority. Otherwise we can refactor the code of flink operator,
> which should import something new configuration to support the native mode.
> >
> > I think it will be better to import some configuration, which
> FlinkConfigManager.class can override it using the resource of
> JobmanagerSpec and TaskManagerSpec.
> >
> > When it deep into the code flink-kubernetes package, we using these new
> configuration as the final result of containers resources.
> >
> > Thanks for your time.
> > Richard Su
> >
> >> 2023年12月5日 16:45,Gyula Fóra  写道:
> >>
> >> As you can see in the jira ticket there hasn't been any progress, nobody
> >> started to work on this yet.
> >>
> >> I personally don't think it's confusing to declare resources in the
> >> FlinkDeployment resource sections. It's well documented and worked very
> >> well so far for most users.
> >> This is pretty common practice for kubernetes.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> On Tue, Dec 5, 2023 at 9:35 AM richard.su 
> wrote:
> >>
> >>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to
> join
> >>> the discussion but I haven't seen any discussion in the url.
> >>>
> >>> I also make flinkdeployment by flink operator, which indeed will
> override
> >>> the process size by TaskmanagerSpec.resources or
> JobmanagerSpec.resources,
> >>> which really confused, I had modified the code of flink operator to
> avoid
> >>> the override.
> >>>
> >>> Looking for your response.
> >>>
> >>> Thank you.
> >>> Richard Su
> >>>
> >>>
>  2023年12月5日 16:22,Gyula Fóra  写道:
> 
>  Hi!
> 
>  Please see the discussion in
>  https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
>  and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
> 
>  We should follow the approach outlined there. If you are interested
> you
> >>> are
>  welcome to pick up the operator ticket.
> 
>  Unfortunately your PR can be a large unexpected change to existing
> users
> >>> so
>  we should not add it.
> 
>  Cheers,
>  Gyula
> 
>  On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
> 
> > Hello everyone,
> >
> > I've encountered an issue while using flink kubernetes native,
> Despite
> > setting resource limits in the pod template, it appears that these
> >>> limits
> > and requests are not considered during JobManager(JM) and TaskManager
> >>> (TM)
> > pod deployment.
> >
> > I find the a issue had opened in jira  FLINK-24150, which introduced
> > almost the same questions that I encountered.
> >
> > I agrees that if user had provided pod templates, we should put
> priority
> > on it higher than flink calculated from configuration.
> >
> > But this need some discussion in our community, because it related
> some
> > scenarios:
> > If I want to create a pod with Graranted QoS and want the memory of
> the
> > Flink main container to be larger than the process size of Flink, I
> >>> cannot
> > directly modify podTemplate (although we can use limit factor, this
> will
> 

Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-05 Thread Xintong Song
Thanks for addressing my comments, Lijie. LGTM

Best,

Xintong



On Tue, Dec 5, 2023 at 2:56 PM Paul Lam  wrote:

> Hi Lijie,
>
> Recovery for batch jobs is no doubt a long-awaited feature. Thanks for
> the proposal!
>
> I’m concerned about the multi-job scenario. In session mode, users could
> use web submission to upload and run jars which may produce multiple
> Flink jobs. However, these jobs may not be submitted at once and run in
> parallel. Instead, they could be dependent on other jobs like a DAG. The
> schedule of the jobs is controlled by the user's main method.
>
> IIUC, in the FLIP, the main method is lost after the recovery, and only
> submitted jobs would be recovered. Is that right?
>
> Best,
> Paul Lam
>
> > 2023年11月2日 18:00,Lijie Wang  写道:
> >
> > Hi devs,
> >
> > Zhu Zhu and I would like to start a discussion about FLIP-383: Support
> Job
> > Recovery for Batch Jobs[1]
> >
> > Currently, when Flink’s job manager crashes or gets killed, possibly due
> to
> > unexpected errors or planned nodes decommission, it will cause the
> > following two situations:
> > 1. Failed, if the job does not enable HA.
> > 2. Restart, if the job enable HA. If it’s a streaming job, the job will
> be
> > resumed from the last successful checkpoint. If it’s a batch job, it has
> to
> > run from beginning, all previous progress will be lost.
> >
> > In view of this, we think the JM crash may cause great regression for
> batch
> > jobs, especially long running batch jobs. This FLIP is mainly to solve
> this
> > problem so that batch jobs can recover most job progress after JM
> crashes.
> > In this FLIP, our goal is to let most finished tasks not need to be
> re-run.
> >
> > You can find more details in the FLIP-383[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
> >
> > Best,
> > Lijie
>
>


Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Sorry Gyula,  let me explain more about the point of 2, if I avoid the 
override, I will got a jobmanager pod still with resources consist with 
“jobmanager.memory.process.size”, but a flinkdeployment with a resource larger 
than that.

Thanks for your time.
Richard Su

> 2023年12月5日 17:13,richard.su  写道:
> 
> Thank you for your time, Gyula, I have more question about Flink-33548, we 
> can have more discussion about this and make progress:
> 
> 1. I agree with you about declaring resources in FlinkDeployment resource 
> sections. But Flink Operator will override the 
> “jobmanager.memory.process.size”  and "taskmanager.memory.process.size", 
> despite I have set these configuration or not in flink configuration. If user 
> had configured all memory attributes, the override will leads to error as the 
> overall computation is error.
> 
> the code of override is in FlinkConfigManager.class in buildFrom method, 
> which apply to JobmanagerSpec and TaskManagerSpec.
> 
> 2. If I modified the code of override, I will still encounter this issue of 
> FLINK-24150, because I only modified the code of flink operator but not 
> flink-kubernetes package, so I will make a pod resources like (cpu:1c 
> memory:1g) and container resource to be (cpu:1c, memory 850m), because I 
> already set jobmanager.memory.process.size to 850m.
> 
> 3. because of there two point, we need to make the podTemplate have higher 
> priority. Otherwise we can refactor the code of flink operator, which should 
> import something new configuration to support the native mode.
> 
> I think it will be better to import some configuration, which 
> FlinkConfigManager.class can override it using the resource of JobmanagerSpec 
> and TaskManagerSpec.
> 
> When it deep into the code flink-kubernetes package, we using these new 
> configuration as the final result of containers resources.
> 
> Thanks for your time.
> Richard Su
> 
>> 2023年12月5日 16:45,Gyula Fóra  写道:
>> 
>> As you can see in the jira ticket there hasn't been any progress, nobody
>> started to work on this yet.
>> 
>> I personally don't think it's confusing to declare resources in the
>> FlinkDeployment resource sections. It's well documented and worked very
>> well so far for most users.
>> This is pretty common practice for kubernetes.
>> 
>> Cheers,
>> Gyula
>> 
>> On Tue, Dec 5, 2023 at 9:35 AM richard.su  wrote:
>> 
>>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to join
>>> the discussion but I haven't seen any discussion in the url.
>>> 
>>> I also make flinkdeployment by flink operator, which indeed will override
>>> the process size by TaskmanagerSpec.resources or JobmanagerSpec.resources,
>>> which really confused, I had modified the code of flink operator to avoid
>>> the override.
>>> 
>>> Looking for your response.
>>> 
>>> Thank you.
>>> Richard Su
>>> 
>>> 
 2023年12月5日 16:22,Gyula Fóra  写道:
 
 Hi!
 
 Please see the discussion in
 https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
 and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
 
 We should follow the approach outlined there. If you are interested you
>>> are
 welcome to pick up the operator ticket.
 
 Unfortunately your PR can be a large unexpected change to existing users
>>> so
 we should not add it.
 
 Cheers,
 Gyula
 
 On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
 
> Hello everyone,
> 
> I've encountered an issue while using flink kubernetes native, Despite
> setting resource limits in the pod template, it appears that these
>>> limits
> and requests are not considered during JobManager(JM) and TaskManager
>>> (TM)
> pod deployment.
> 
> I find the a issue had opened in jira  FLINK-24150, which introduced
> almost the same questions that I encountered.
> 
> I agrees that if user had provided pod templates, we should put priority
> on it higher than flink calculated from configuration.
> 
> But this need some discussion in our community, because it related some
> scenarios:
> If I want to create a pod with Graranted QoS and want the memory of the
> Flink main container to be larger than the process size of Flink, I
>>> cannot
> directly modify podTemplate (although we can use limit factor, this will
> cause the QoS to change from Graranted to Burstable)
> If I want to create a pod with Burstable QoS, I don't want to use limit
> actor and want to directly configure the request to be 50% of the limit,
> which cannot be modified.
> In order to meet these scenarios, I had committed a pull request
> https://github.com/apache/flink/pull/23872
> 
> This code is very simple and just need someone to review, this pr can be
> cherry pick to other old version, which will be helpful.
> 
> 
> I would appreciate any feedback on this.
> 
> Thank you for your time and contributions to 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Thank you for your time, Gyula, I have more question about Flink-33548, we can 
have more discussion about this and make progress:

1. I agree with you about declaring resources in FlinkDeployment resource 
sections. But Flink Operator will override the “jobmanager.memory.process.size” 
 and "taskmanager.memory.process.size", despite I have set these configuration 
or not in flink configuration. If user had configured all memory attributes, 
the override will leads to error as the overall computation is error.

the code of override is in FlinkConfigManager.class in buildFrom method, which 
apply to JobmanagerSpec and TaskManagerSpec.

2. If I modified the code of override, I will still encounter this issue of 
FLINK-24150, because I only modified the code of flink operator but not 
flink-kubernetes package, so I will make a pod resources like (cpu:1c 
memory:1g) and container resource to be (cpu:1c, memory 850m), because I 
already set jobmanager.memory.process.size to 850m.

3. because of there two point, we need to make the podTemplate have higher 
priority. Otherwise we can refactor the code of flink operator, which should 
import something new configuration to support the native mode.

I think it will be better to import some configuration, which 
FlinkConfigManager.class can override it using the resource of JobmanagerSpec 
and TaskManagerSpec.

When it deep into the code flink-kubernetes package, we using these new 
configuration as the final result of containers resources.

Thanks for your time.
Richard Su

> 2023年12月5日 16:45,Gyula Fóra  写道:
> 
> As you can see in the jira ticket there hasn't been any progress, nobody
> started to work on this yet.
> 
> I personally don't think it's confusing to declare resources in the
> FlinkDeployment resource sections. It's well documented and worked very
> well so far for most users.
> This is pretty common practice for kubernetes.
> 
> Cheers,
> Gyula
> 
> On Tue, Dec 5, 2023 at 9:35 AM richard.su  wrote:
> 
>> Hi, Gyula, is there had any progress in FLINK-33548? I would like to join
>> the discussion but I haven't seen any discussion in the url.
>> 
>> I also make flinkdeployment by flink operator, which indeed will override
>> the process size by TaskmanagerSpec.resources or JobmanagerSpec.resources,
>> which really confused, I had modified the code of flink operator to avoid
>> the override.
>> 
>> Looking for your response.
>> 
>> Thank you.
>> Richard Su
>> 
>> 
>>> 2023年12月5日 16:22,Gyula Fóra  写道:
>>> 
>>> Hi!
>>> 
>>> Please see the discussion in
>>> https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
>>> and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
>>> 
>>> We should follow the approach outlined there. If you are interested you
>> are
>>> welcome to pick up the operator ticket.
>>> 
>>> Unfortunately your PR can be a large unexpected change to existing users
>> so
>>> we should not add it.
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
>>> 
 Hello everyone,
 
 I've encountered an issue while using flink kubernetes native, Despite
 setting resource limits in the pod template, it appears that these
>> limits
 and requests are not considered during JobManager(JM) and TaskManager
>> (TM)
 pod deployment.
 
 I find the a issue had opened in jira  FLINK-24150, which introduced
 almost the same questions that I encountered.
 
 I agrees that if user had provided pod templates, we should put priority
 on it higher than flink calculated from configuration.
 
 But this need some discussion in our community, because it related some
 scenarios:
 If I want to create a pod with Graranted QoS and want the memory of the
 Flink main container to be larger than the process size of Flink, I
>> cannot
 directly modify podTemplate (although we can use limit factor, this will
 cause the QoS to change from Graranted to Burstable)
 If I want to create a pod with Burstable QoS, I don't want to use limit
 actor and want to directly configure the request to be 50% of the limit,
 which cannot be modified.
 In order to meet these scenarios, I had committed a pull request
 https://github.com/apache/flink/pull/23872
 
 This code is very simple and just need someone to review, this pr can be
 cherry pick to other old version, which will be helpful.
 
 
 I would appreciate any feedback on this.
 
 Thank you for your time and contributions to the Flink project.
 
 Thank you,
 chaoran.su
>> 
>> 



Re: Subscribe Apache Flink development email.

2023-12-05 Thread Hang Ruan
Hi, aaron.

If you want to subscribe the dev mail list, you need to send an e-mail to
dev-subscr...@flink.apache.org . See more in [1].
Mailing list could be found here[2].

Best,
Hang

[1]
https://flink.apache.org/what-is-flink/community/#how-to-subscribe-to-a-mailing-list
[2] https://flink.apache.org/what-is-flink/community/#mailing-lists


aaron ai  于2023年12月5日周二 14:48写道:

> Subscribe Apache Flink development email.
>


Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Gyula Fóra
As you can see in the jira ticket there hasn't been any progress, nobody
started to work on this yet.

I personally don't think it's confusing to declare resources in the
FlinkDeployment resource sections. It's well documented and worked very
well so far for most users.
This is pretty common practice for kubernetes.

Cheers,
Gyula

On Tue, Dec 5, 2023 at 9:35 AM richard.su  wrote:

> Hi, Gyula, is there had any progress in FLINK-33548? I would like to join
> the discussion but I haven't seen any discussion in the url.
>
> I also make flinkdeployment by flink operator, which indeed will override
> the process size by TaskmanagerSpec.resources or JobmanagerSpec.resources,
> which really confused, I had modified the code of flink operator to avoid
> the override.
>
> Looking for your response.
>
> Thank you.
> Richard Su
>
>
> > 2023年12月5日 16:22,Gyula Fóra  写道:
> >
> > Hi!
> >
> > Please see the discussion in
> > https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
> > and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
> >
> > We should follow the approach outlined there. If you are interested you
> are
> > welcome to pick up the operator ticket.
> >
> > Unfortunately your PR can be a large unexpected change to existing users
> so
> > we should not add it.
> >
> > Cheers,
> > Gyula
> >
> > On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
> >
> >> Hello everyone,
> >>
> >> I've encountered an issue while using flink kubernetes native, Despite
> >> setting resource limits in the pod template, it appears that these
> limits
> >> and requests are not considered during JobManager(JM) and TaskManager
> (TM)
> >> pod deployment.
> >>
> >> I find the a issue had opened in jira  FLINK-24150, which introduced
> >> almost the same questions that I encountered.
> >>
> >> I agrees that if user had provided pod templates, we should put priority
> >> on it higher than flink calculated from configuration.
> >>
> >> But this need some discussion in our community, because it related some
> >> scenarios:
> >> If I want to create a pod with Graranted QoS and want the memory of the
> >> Flink main container to be larger than the process size of Flink, I
> cannot
> >> directly modify podTemplate (although we can use limit factor, this will
> >> cause the QoS to change from Graranted to Burstable)
> >> If I want to create a pod with Burstable QoS, I don't want to use limit
> >> actor and want to directly configure the request to be 50% of the limit,
> >> which cannot be modified.
> >> In order to meet these scenarios, I had committed a pull request
> >> https://github.com/apache/flink/pull/23872
> >>
> >> This code is very simple and just need someone to review, this pr can be
> >> cherry pick to other old version, which will be helpful.
> >>
> >>
> >> I would appreciate any feedback on this.
> >>
> >> Thank you for your time and contributions to the Flink project.
> >>
> >> Thank you,
> >> chaoran.su
>
>


RE: Re: [DISCUSS] Proposing an LTS Release for the 1.x Line

2023-12-05 Thread Payne, Julian
Hey all,
Thanks for this proposal, I think it makes a lot of sense. I am, curious to 
know what time horizon we would consider for LTS of 1.x. Customers value 
knowing when versions will deprecate so they can build migration into their 
planning and resourcing cycles, so I would be in favour of being transparent on 
how long the community will support 1.x.

Thanks


Julian

On 2023/07/26 14:16:43 Konstantin Knauf wrote:
> Hi Jing,
>
> > How could we help users and avoid this happening?
>
> I don't think we will be able to avoid this in all cases. And I think
> that's ok. Its always a trade-off between supporting new use cases and
> moving the project forward and backwards compatibility (in a broad sense).
> For example, we dropped Mesos support in a minor release in the past. If
> you're only option for running Flink was Mesos, you were stuck on Flink
> 1.13 or so.
>
> So, I think, it is in the end a case-by-case decision. How big is the cost
> of continued support a "legacy feature/system" and how many users are
> affected to which degree by dropping it?
>
> Best,
>
> Konstantin
>
>
> Am Di., 25. Juli 2023 um 18:34 Uhr schrieb Jing Ge
> :
>
> > Hi Konstantin,
> >
> > I might have not made myself clear enough, apologies. The
> > source-/sink-function was used as a concrete example to discuss the pattern
> > before we decided to offer LTS. The intention was not to hijack this thread
> > to discuss how to deprecate them.
> >
> > We all wish that the only thing users need to migrate from Flink 1.x to 2.0
> > is some code changes in their repos and we all wish users will migrate, if
> > LTS has long enough support time. But the question I tried to discuss is
> > not the wish but the "How?". We might be able to toss the high migration
> > effort aside(we shouldn't), since it is theoretically still doable if users
> > have long enough time, even if the effort is extremely high. Another
> > concern is that if "function regressions" is allowed in 2.0, i.e. if 2.0
> > has a lack of functionalities or bugs compared to 1.x, there will be no way
> > for users to do the migration regardless of whether we encourage them to
> > migrate or they haven been given enough time(how long is enough?) because
> > LTS has been offered. How could we help users and avoid this happening?
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jul 25, 2023 at 6:57 PM Konstantin Knauf 
> > wrote:
> >
> > > Hi Jing,
> > >
> > > let's not overindex on the Source-/SinkFunction discussion in this
> > thread.
> > >
> > > We will generally drop/break a lot of APIs in Flink 2.0. So, naturally
> > > users will need to make more changes to their code in order to migrate
> > from
> > > 1.x to Flink 2.0. In order to give them more time to do this, we support
> > > the last Flink 1.x release for a longer time with bug fix releases.
> > >
> > > Of course, we still encourage users to migrate to Flink 2.0, because at
> > > some point, we will stop support Flink 1.x. For example, if we followed
> > > Marton's proposal we would support Flink 1.x LTS for about 2 years
> > (roughly
> > > 4 minor release cycles) instead of about 1 year (2 minor release cycles)
> > > for regular minor releases. This seems like a reasonable timeframe to me.
> > > It also gives us more time to discover and address blockers in migrating
> > to
> > > Flink 2.x that we are not aware of right now.
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > > Am Di., 25. Juli 2023 um 12:48 Uhr schrieb Jing Ge
> > > :
> > >
> > > > Hi all,
> > > >
> > > > Overall, it is a good idea to provide the LTS release, but I'd like to
> > > > reference a concrete case as an example to understand what restrictions
> > > the
> > > > LTS should have.
> > > >
> > > > Hypothetically, Source-/Sink- Function have been deprecated in 1.x LTS
> > > and
> > > > removed in 2.0 and the issues[1] are not solved in 2.0. This is a
> > typical
> > > > scenario that the old APIs are widely used in 1.x LTS and the new APIs
> > in
> > > > 2.0 are not ready yet to take over all users. We will have the
> > following
> > > > questions:
> > > >
> > > > 1. Is this scenario allowed at all? Do we all agree that there could be
> > > > some features/functionalities that only work in 1.x LTS after 2.0 has
> > > been
> > > > released?
> > > > 2. How long are we going to support 1.x LTS? 1 year? 2 years? As long
> > as
> > > > the issues that block users from migrating to 2.0 are not solved, we
> > > can't
> > > > stop the LTS support, even if the predefined support time expires.
> > > > 3. What is the intention to release a new version with (or without)
> > LTS?
> > > Do
> > > > we still want to engage users to migrate to the new release asap? If
> > the
> > > > old APIs 1.x LTS offer more than the new APIs in 2.0 or it is almost
> > > > impossible to migrate, double effort will be required to maintain those
> > > > major releases for a very long time. We will be facing many cohorts.
> > > >
> > > > IMHO, we should be clear with those 

Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread richard.su
Hi, Gyula, is there had any progress in FLINK-33548? I would like to join the 
discussion but I haven't seen any discussion in the url.

I also make flinkdeployment by flink operator, which indeed will override the 
process size by TaskmanagerSpec.resources or JobmanagerSpec.resources, which 
really confused, I had modified the code of flink operator to avoid the 
override.

Looking for your response.

Thank you.
Richard Su


> 2023年12月5日 16:22,Gyula Fóra  写道:
> 
> Hi!
> 
> Please see the discussion in
> https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
> and the ticket: https://issues.apache.org/jira/browse/FLINK-33548
> 
> We should follow the approach outlined there. If you are interested you are
> welcome to pick up the operator ticket.
> 
> Unfortunately your PR can be a large unexpected change to existing users so
> we should not add it.
> 
> Cheers,
> Gyula
> 
> On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:
> 
>> Hello everyone,
>> 
>> I've encountered an issue while using flink kubernetes native, Despite
>> setting resource limits in the pod template, it appears that these limits
>> and requests are not considered during JobManager(JM) and TaskManager (TM)
>> pod deployment.
>> 
>> I find the a issue had opened in jira  FLINK-24150, which introduced
>> almost the same questions that I encountered.
>> 
>> I agrees that if user had provided pod templates, we should put priority
>> on it higher than flink calculated from configuration.
>> 
>> But this need some discussion in our community, because it related some
>> scenarios:
>> If I want to create a pod with Graranted QoS and want the memory of the
>> Flink main container to be larger than the process size of Flink, I cannot
>> directly modify podTemplate (although we can use limit factor, this will
>> cause the QoS to change from Graranted to Burstable)
>> If I want to create a pod with Burstable QoS, I don't want to use limit
>> actor and want to directly configure the request to be 50% of the limit,
>> which cannot be modified.
>> In order to meet these scenarios, I had committed a pull request
>> https://github.com/apache/flink/pull/23872
>> 
>> This code is very simple and just need someone to review, this pr can be
>> cherry pick to other old version, which will be helpful.
>> 
>> 
>> I would appreciate any feedback on this.
>> 
>> Thank you for your time and contributions to the Flink project.
>> 
>> Thank you,
>> chaoran.su



Re: Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread Gyula Fóra
Hi!

Please see the discussion in
https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969
and the ticket: https://issues.apache.org/jira/browse/FLINK-33548

We should follow the approach outlined there. If you are interested you are
welcome to pick up the operator ticket.

Unfortunately your PR can be a large unexpected change to existing users so
we should not add it.

Cheers,
Gyula

On Tue, Dec 5, 2023 at 9:05 AM 苏超腾  wrote:

> Hello everyone,
>
> I've encountered an issue while using flink kubernetes native, Despite
> setting resource limits in the pod template, it appears that these limits
> and requests are not considered during JobManager(JM) and TaskManager (TM)
> pod deployment.
>
> I find the a issue had opened in jira  FLINK-24150, which introduced
> almost the same questions that I encountered.
>
> I agrees that if user had provided pod templates, we should put priority
> on it higher than flink calculated from configuration.
>
> But this need some discussion in our community, because it related some
> scenarios:
> If I want to create a pod with Graranted QoS and want the memory of the
> Flink main container to be larger than the process size of Flink, I cannot
> directly modify podTemplate (although we can use limit factor, this will
> cause the QoS to change from Graranted to Burstable)
> If I want to create a pod with Burstable QoS, I don't want to use limit
> actor and want to directly configure the request to be 50% of the limit,
> which cannot be modified.
> In order to meet these scenarios, I had committed a pull request
> https://github.com/apache/flink/pull/23872
>
> This code is very simple and just need someone to review, this pr can be
> cherry pick to other old version, which will be helpful.
>
>
> I would appreciate any feedback on this.
>
> Thank you for your time and contributions to the Flink project.
>
> Thank you,
> chaoran.su


Discussion: [FLINK-24150] Support to configure cpu resource request and limit in pod template

2023-12-05 Thread 苏超腾
Hello everyone,

I've encountered an issue while using flink kubernetes native, Despite setting 
resource limits in the pod template, it appears that these limits and requests 
are not considered during JobManager(JM) and TaskManager (TM) pod deployment. 

I find the a issue had opened in jira  FLINK-24150, which introduced almost the 
same questions that I encountered.

I agrees that if user had provided pod templates, we should put priority on it 
higher than flink calculated from configuration.

But this need some discussion in our community, because it related some 
scenarios:
If I want to create a pod with Graranted QoS and want the memory of the Flink 
main container to be larger than the process size of Flink, I cannot directly 
modify podTemplate (although we can use limit factor, this will cause the QoS 
to change from Graranted to Burstable)
If I want to create a pod with Burstable QoS, I don't want to use limit actor 
and want to directly configure the request to be 50% of the limit, which cannot 
be modified.
In order to meet these scenarios, I had committed a pull request 
https://github.com/apache/flink/pull/23872

This code is very simple and just need someone to review, this pr can be cherry 
pick to other old version, which will be helpful.


I would appreciate any feedback on this.

Thank you for your time and contributions to the Flink project.

Thank you,
chaoran.su