Re: Flink native k8s integration vs. operator

2022-01-13 Thread Xintong Song
Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.

Looking forward to the public discussion. Please feel free to reach out if
there's anything you need from us.

Thank you~

Xintong Song



On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang 
wrote:

> Thanks Thomas, Gyula, and Marton for driving this effort! It would greatly
> ease the adoption of Apache Flink on Kubernetes and help to address the
> current operational pain points as mentioned. Look forward to the proposal
> and more discussions!
>
> Best,
> Chenya
>
> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
> wrote:
>
>> Hi All,
>>
>> I am pleased to see the level of enthusiasm and technical consideration
>> already emerging in this thread. I wholeheartedly support building an
>> operator and endorsing it via placing it under the Apache Flink umbrella
>> (as a separate repository) as the current lack of it is clearly becoming
>> an
>> adoption bottleneck for large scale Flink users. The next logical step is
>> to write a FLIP to agree on the technical details, so that we can put
>> forward the proposal to the Flink PMC for creating a new repository with a
>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
>> initial wording on the proposal which we will put up for public discussion
>> in the coming weeks.
>>
>> Best,
>> Marton
>>
>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
>> wrote:
>>
>> > Hi Thomas,
>> >
>> > Yes, I was referring to a separate repository under Apache Flink.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> Thanks for the feedback and discussion. A few additional thoughts:
>> >>
>> >> [Konstantin] > With respect to common lifecycle management operations:
>> >> these features are
>> >> > not available (within Apache Flink) for any of the other resource
>> >> providers
>> >> > (YARN, Standalone) either. From this perspective, I wouldn't consider
>> >> this
>> >> > a shortcoming of the Kubernetes integration.
>> >>
>> >> I think time and evolution of the ecosystem are factors to consider as
>> >> well. The state and usage of Flink was much different when YARN
>> >> integration was novel. Expectations are different today and the
>> >> lifecycle functionality provided by an operator may as well be
>> >> considered essential to support the concept of a Flink application on
>> >> k8s. After few years learning from operator experience outside of
>> >> Flink it might be a good time to fill the gap.
>> >>
>> >> [Konstantin] > I still believe that we should keep this focus on low
>> >> > level composable building blocks (like Jobs and Snapshots) in Apache
>> >> Flink
>> >> > to make it easy for everyone to build fitting higher level
>> abstractions
>> >> > like a FlinkApplication Custom Resource on top of it.
>> >>
>> >> I completely agree that it is important that the basic functions of
>> >> Flink are solid and continued focus is necessary. Thanks for sharing
>> >> the pointers, these are great improvements. At the same time,
>> >> ecosystem, contributor base and user spectrum are growing. There have
>> >> been significant additions in many areas of Flink including connectors
>> >> and higher level abstractions like statefun, SQL and Python. It's also
>> >> evident from additional repositories/subprojects that we have in Flink
>> >> today.
>> >>
>> >> [Konstantin] > Having said this, if others in the community have the
>> >> capacity to push and
>> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
>> Apache
>> >> > Flink, I don't see any blockers. If or when this happens, I'd see
>> some
>> >> > clear benefits of using a separate repository (easier independent
>> >> > versioning and releases, different build system & tooling (go, I
>> >> assume)).
>> >>
>> >> Naturally different contributors to the project have different focus.
>> >> Let's find out if there is strong enough interest to take this on and
>> >> strong enough commitment to maintain. As I see it, there is a
>> >> tremendous amount of internal investment going into operationalizing
>> >> Flink within many companies. Improvements to the operational side of
>> >> Flink like the operator would complement Flink nicely. I assume that
>> >> you are referring to a separate repository within Apache Flink, which
>> >> would give it the chance to achieve better sustainability than the
>> >> existing external operator efforts. There is also the fact that some
>> >> organizations which are heavily invested in operationalizing Flink are
>> >> allowing contributing to Apache Flink itself but less so to arbitrary
>> >> github projects. Regarding the tooling, it could well turn out that
>> >> Java is a good alternative given the ecosystem focus and that there is
>> >> an opportunity for reuse in certain aspects (metrics, logging etc.).
>> >>
>> >> [Yang] > I think Xintong has given a strong point why we introduced
>> >> the native K8s in

[jira] [Created] (FLINK-25654) Remove the redundant lock in SortMergeResultPartition

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25654:
---

 Summary: Remove the redundant lock in SortMergeResultPartition
 Key: FLINK-25654
 URL: https://issues.apache.org/jira/browse/FLINK-25654
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.14.4


After FLINK-2372, the task canceler will never call the close method of 
ResultPartition, this can reduce some race conditions and simplify the code. 
This ticket aims to remove some redundant locks in SortMergeResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25653) Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25653:
---

 Summary: Move buffer recycle in SortMergeSubpartitionReader out of 
lock to avoid deadlock
 Key: FLINK-25653
 URL: https://issues.apache.org/jira/browse/FLINK-25653
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.13.5, 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.13.6, 1.14.4


For the current sort-shuffle implementation, the different lock order in 
SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler can cause 
deadlock. To solve the problem, we can move buffer recycle in 
SortMergeSubpartitionReader out of the lock.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-203: Incremental savepoints

2022-01-13 Thread Yu Li
Thanks for the update, Piotr!

> Is `state.backend.incremental` the only configuration parameter that can
be
> used in this context?
According to FLIP-193 [1], all the existing checkpoint configurations are
actually for *Snapshot*, ownership (lifecycle) is the only difference
between Checkpoints and Savepoints, and I suggest we keep the description
aligned with FLIP-193.

> a) What about RocksDB upgrades? If we bump RocksDB version between Flink
> versions, do we support recovering from a native format snapshot
> (incremental checkpoint)?
Below are my two cents:
* The functionality of incremental native-format savepoint is (like
*snapshot* in traditional database [2]) to (fast) produce a persisted,
self-contained version of the current state of the job for point-in-time
recovery, but cannot replace canonical savepoint (like *backup* in
traditional database) for upgrading or state-backend-switching, etc.
* We prefer such functionality to be supplied by a *savepoint* instead of a
(retained) *checkpoint* because the life-cycle of the data should be
user-controlled rather than system-controlled [1].
* If we'd like to cover all functionalities the canonical savepoint has
now, the design for incremental *canonical-format* savepoint would be
required, which is more complicated and could be considered as future work.

Best Regards,
Yu

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership
[2] https://www.hitechnectar.com/blogs/snapshot-vs-backup/


On Thu, 13 Jan 2022 at 19:40, Piotr Nowojski  wrote:

> Hi,
>
> Thanks for the comments and questions. Starting from the top:
>
> Seth: good point about schema evolution. Actually, I have a very similar
> question to State Processor API. Is it the same scenario in this case?
> Should it also be working with checkpoints but might be just untested?
>
> And next question, should we commit to supporting those two things (State
> Processor API and schema evolution) for native savepoints? What about
> aligned checkpoints? (please check [1] for that).
>
> Yu Li: 1, 2 and 4 done.
>
> > 3. How about changing the description of "the default configuration of
> the
> > checkpoints will be used to determine whether the savepoint should be
> > incremental or not" to something like "the `state.backend.incremental`
> > setting now denotes the type of native format snapshot and will take
> effect
> > for both checkpoint and savepoint (with native type)", to prevent concept
> > confusion between checkpoint and savepoint?
>
> Is `state.backend.incremental` the only configuration parameter that can be
> used in this context? I would guess not? What about for example
> "state.storage.fs.memory-threshold" or all of the Advanced RocksDB State
> Backends Options [2]?
>
> David:
>
> > does this mean that we need to keep the checkpoints compatible across
> minor
> > versions? Or can we say, that the minor version upgrades are only
> > guaranteed with canonical savepoints?
>
> Good question. Frankly I was always assuming that this is implicitly given.
> Otherwise users would not be able to recover jobs that are failing because
> of bugs in Flink. But I'm pretty sure that was never explicitly stated.
>
> As Konstantin suggested, I've written down the pre-existing guarantees of
> checkpoints and savepoints followed by two proposals on how they should be
> changed [1]. Could you take a look?
>
> I'm especially unsure about the following things:
> a) What about RocksDB upgrades? If we bump RocksDB version between Flink
> versions, do we support recovering from a native format snapshot
> (incremental checkpoint)?
> b) State Processor API - both pre-existing and what do we want to provide
> in the future
> c) Schema Evolution - both pre-existing and what do we want to provide in
> the future
>
> Best,
> Piotrek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Checkpointvssavepointguarantees
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-rocksdb-state-backends-options
>
> wt., 11 sty 2022 o 09:45 Konstantin Knauf  napisał(a):
>
> > Hi Piotr,
> >
> > would it be possible to provide a table that shows the
> > compatibility guarantees provided by the different snapshots going
> forward?
> > Like type of change (Topology. State Schema, Parallelism, ..) in one
> > dimension, and type of snapshot as the other dimension. Based on that, it
> > would be easier to discuss those guarantees, I believe.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Mon, Jan 3, 2022 at 9:11 AM David Morávek  wrote:
> >
> > > Hi Piotr,
> > >
> > > does this mean that we need to keep the checkpoints compatible across
> > minor
> > > versions? Or can we say, that the minor version upgrades are only
> > > guaranteed with canonical savepoints?
> > >
> > > My concern is especially if we'd want to change layout of the
> checkpoint.
> > >
> > > D.
> > >
> > >
> > >
> > > On Wed, Dec 29

[jira] [Created] (FLINK-25652) Can "duration“ and ”received records" be updated at the same time in WebUI's task detail ?

2022-01-13 Thread jeff-zou (Jira)
jeff-zou created FLINK-25652:


 Summary: Can "duration“ and ”received records"  be updated at the 
same time in WebUI's task detail  ?
 Key: FLINK-25652
 URL: https://issues.apache.org/jira/browse/FLINK-25652
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.13.2
Reporter: jeff-zou
 Attachments: 20220114145221.png

Can "duration“ and ”records received"  be updated at the same time in WebUI's 
task detail  ?

then I can get more precise QPS which the value is  equal to  ”records 
received" div "duration“. 

!20220114145221.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Creating an external connector repository

2022-01-13 Thread Martijn Visser
Hi everyone,

If you have any more comments or questions, do let me know. Else I'll open
up a vote thread next week.

Best regards,

Martijn

On Tue, 11 Jan 2022 at 20:13, Martijn Visser  wrote:

> Good question: we want to use the same setup as we currently have for
> Flink, so using the existing CI infrastructure.
>
> On Mon, 10 Jan 2022 at 11:19, Chesnay Schepler  wrote:
>
>> What CI resources do you actually intend use? Asking since the ASF GHA
>> resources are afaik quite overloaded.
>>
>> On 05/01/2022 11:48, Martijn Visser wrote:
>> > Hi everyone,
>> >
>> > I wanted to summarise the email thread and see if there are any open
>> items
>> > that still need to be discussed, before we can finalise the discussion
>> in
>> > this email thread:
>> >
>> > 1. About having multi connectors in one repo or each connector in its
>> own
>> > repository
>> >
>> > As explained by @Arvid Heise  we ultimately propose
>> to
>> > have a single repository per connector, which seems to be favoured in
>> the
>> > community.
>> >
>> > 2. About having the connector repositories under ASF or not.
>> >
>> > The consensus is that all connectors would remain under the ASF.
>> >
>> > I think we can categorise the questions or concerns that are brought
>> > forward as the following one:
>> >
>> > 3. How would we set up the testing?
>> >
>> > We need to make sure that we provide a proper testing framework, which
>> > means that we provide a public Source- and Sink testing framework. As
>> > mentioned extensively in the thread, we need to make sure that the
>> > necessary interfaces are properly annotated and at least
>> @PublicEvolving.
>> > This also includes the test infrastructure, like MiniCluster. For the
>> > latter, we don't know exactly yet how to balance having publicly
>> available
>> > test infrastructure vs being able to iterate inside of Flink, but we can
>> > all agree this has to be solved.
>> >
>> > For testing infrastructure, we would like to use Github Actions. In the
>> > current state, it probably makes sense for a connector repo to follow
>> the
>> > branching strategy of Flink. That will ensure a match between the
>> released
>> > connector and Flink version. This should change when all the Flink
>> > interfaces have stabilised so you can use a connector with multiple
>> Flink
>> > versions. That means that we should have a nightly build test for:
>> >
>> > - The `main` branch of the connector (which would be the unreleased
>> > version) against the `master` branch of Flink (the unreleased version of
>> > Flink).
>> > - Any supported `release-X.YY` branch of the connector against the
>> > `release-X.YY` branch of Flink.
>> >
>> > We should also have a smoke test E2E tests in Flink (one for DataStream,
>> > one for Table, one for SQL, one for Python) which loads all the
>> connectors
>> > and does an arbitrary test (post data on source, load into Flink, sink
>> > output and compare that output is as expected.
>> >
>> > 4. How would we integrate documentation?
>> >
>> > Documentation for a connector should probably end up in the connector
>> > repository. The Flink website should contain one entrance to all
>> connectors
>> > (so not the current approach where we have connectors per DataStream
>> API,
>> > Table API etc). Each connector documentation should end up as one menu
>> item
>> > in connectors, containing all necessary information for all DataStream,
>> > Table, SQL and Python implementations.
>> >
>> > 5. Which connectors should end up in the external connector repo?
>> >
>> > I'll open up a separate thread on this topic to have a parallel
>> discussion
>> > on that. We should reach consensus on both threads before we can move
>> > forward on this topic as a whole.
>> >
>> > Best regards,
>> >
>> > Martijn
>> >
>> > On Fri, 10 Dec 2021 at 04:47, Thomas Weise  wrote:
>> >
>> >> +1 for repo per connector from my side also
>> >>
>> >> Thanks for trying out the different approaches.
>> >>
>> >> Where would the common/infra pieces live? In a separate repository
>> >> with its own release?
>> >>
>> >> Thomas
>> >>
>> >> On Thu, Dec 9, 2021 at 12:42 PM Till Rohrmann 
>> >> wrote:
>> >>> Sorry if I was a bit unclear. +1 for the single repo per connector
>> >> approach.
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Thu, Dec 9, 2021 at 5:41 PM Till Rohrmann 
>> >> wrote:
>>  +1 for the single repo approach.
>> 
>>  Cheers,
>>  Till
>> 
>>  On Thu, Dec 9, 2021 at 3:54 PM Martijn Visser > >
>>  wrote:
>> 
>> > I also agree that it feels more natural to go with a repo for each
>> > individual connector. Each repository can be made available at
>> > flink-packages.org so users can find them, next to referring to
>> them
>> >> in
>> > documentation. +1 from my side.
>> >
>> > On Thu, 9 Dec 2021 at 15:38, Arvid Heise  wrote:
>> >
>> >> Hi all,
>> >>
>> >> We tried out Chesnay's proposal and went with Option 2.
>> >> Unfortunately,
>> > we
>> >>

Re: [DISCUSS] Moving connectors from Flink to external connector repositories

2022-01-13 Thread Martijn Visser
Hi everyone,

If you have any more comments or questions, please let me know. Else I
would open up a vote on this thread in the next couple of days.

Best regards,

Martijn

On Thu, 6 Jan 2022 at 09:45, Qingsheng Ren  wrote:

> Thanks Martijn for driving this!
>
> I’m +1 for Martijn’s proposal. It’s important to avoid making some
> connectors above others, and all connectors should share the same quality
> standard. Keeping some basic connectors like FileSystem is reasonable since
> it’s crucial for new users to try and explore Flink quickly.
>
> Another point I’d like to mention is that we need to add more E2E cases
> using basic connectors in Flink main repo after we moving connectors out.
> Currently E2E tests are heavily dependent on connectors. It’s essential to
> keep the coverage and quality of Flink main repo even without these
> connector’s E2E cases.
>
> Best regards,
>
> Qingsheng Ren
>
>
> > On Jan 5, 2022, at 9:59 PM, Martijn Visser 
> wrote:
> >
> > Hi everyone,
> >
> > As already mentioned in the previous discussion thread [1] I'm opening
> up a
> > parallel discussion thread on moving connectors from Flink to external
> > connector repositories. If you haven't read up on this discussion
> before, I
> > recommend reading that one first.
> >
> > The goal with the external connector repositories is to make it easier to
> > develop and release connectors by not being bound to the release cycle of
> > Flink itself. It should result in faster connector releases, a more
> active
> > connector community and a reduced build time for Flink.
> >
> > We currently have the following connectors available in Flink itself:
> >
> > * Kafka -> For DataStream & Table/SQL users
> > * Upsert-Kafka -> For Table/SQL users
> > * Cassandra -> For DataStream users
> > * Elasticsearch -> For DataStream & Table/SQL users
> > * Kinesis -> For DataStream users & Table/SQL users
> > * RabbitMQ -> For DataStream users
> > * Google Cloud PubSub -> For DataStream users
> > * Hybrid Source -> For DataStream users
> > * NiFi -> For DataStream users
> > * Pulsar -> For DataStream users
> > * Twitter -> For DataStream users
> > * JDBC -> For DataStream & Table/SQL users
> > * FileSystem -> For DataStream & Table/SQL users
> > * HBase -> For DataStream & Table/SQL users
> > * DataGen -> For Table/SQL users
> > * Print -> For Table/SQL users
> > * BlackHole -> For Table/SQL users
> > * Hive -> For Table/SQL users
> >
> > I would propose to move out all connectors except Hybrid Source,
> > FileSystem, DataGen, Print and BlackHole because:
> >
> > * We should avoid at all costs that certain connectors are considered as
> > 'Core' connectors. If that happens, it creates a perception that there
> are
> > first-grade/high-quality connectors because they are in 'Core' Flink and
> > second-grade/lesser-quality connectors because they are outside of the
> > Flink codebase. It directly hurts the goal, because these connectors are
> > still bound to the release cycle of Flink. Last but not least, it risks
> any
> > success of external connector repositories since every connector
> > contributor would still want to be in 'Core' Flink.
> > * To continue on the quality of connectors, we should aim that all
> > connectors are of high quality. That means that we shouldn't have a
> > connector that's only available for either DataStream or Table/SQL users,
> > but for both. It also means that (if applicable) the connector should
> > support all options, like bounded and unbounded scan, lookup, batch and
> > streaming sink capabilities. In the end the quality should depend on the
> > maintainers of the connector, not on where the code is maintained.
> > * The Hybrid Source connector is a special connector because of its
> > purpose.
> > * The FileSystem, DataGen, Print and BlackHole connectors are important
> for
> > first time Flink users/testers. If you want to experiment with Flink, you
> > will most likely start with a local file before moving to one of the
> other
> > sources or sinks. These 4 connectors can help with either reading/writing
> > local files or generating/displaying/ignoring data.
> > * Some of the connectors haven't been maintained in a long time (for
> > example, NiFi and Google Cloud PubSub). An argument could be made that we
> > check if we actually want to move such a connector or make the decision
> to
> > drop the connector entirely.
> >
> > I'm looking forward to your thoughts!
> >
> > Best regards,
> >
> > Martijn Visser | Product Manager
> >
> > mart...@ververica.com
> >
> > [1] https://lists.apache.org/thread/bywh947r2f5hfocxq598zhyh06zhksrm
> >
> > 
> >
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
>
>


Re: [DISCUSS] Releasing Flink 1.14.3

2022-01-13 Thread Martijn Visser
Hi all,

We still need a couple of PMCs to validate the release and put out their
vote. The vote thread can be found at
https://lists.apache.org/thread/cqn8p7kv4tbc4hn4czjvzfcd905jztro

Best regards,

Martijn

On Tue, 11 Jan 2022 at 16:29, Martijn Visser  wrote:

> Hi Thomas,
>
> Thanks! I'll prepare the website PR and send out the VOTE in a couple of
> hours.
>
> Best regards,
>
> Martijn
>
> On Tue, 11 Jan 2022 at 05:53, Thomas Weise  wrote:
>
>> Thank you Xingbo. I meanwhile also got my Azure pipeline working and
>> was able to build the artifacts. Although in general it would be nice
>> if not every release volunteer had to set up their separate Azure
>> environment.
>>
>> Martijn,
>>
>> The release is staged, except for the website PR:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351075&projectId=12315522
>> https://dist.apache.org/repos/dist/dev/flink/flink-1.14.3-rc1/
>> https://repository.apache.org/content/repositories/orgapacheflink-1481/
>> https://github.com/apache/flink/releases/tag/release-1.14.3-rc1
>>
>> Would you like to create the website PR and start a VOTE?
>>
>> (If not, I can look into that tomorrow.)
>>
>> Cheers,
>> Thomas
>>
>>
>>
>> On Sun, Jan 9, 2022 at 9:17 PM Xingbo Huang  wrote:
>> >
>> > Hi Thomas,
>> >
>> > Since multiple wheel packages with different python versions for mac and
>> > linux are generated, building locally requires you have multiple
>> machines
>> > with different os and Python environments. I have triggered the wheel
>> > package build of release-1.14.3-rc1 in my private Azure[1] and you can
>> > download the wheels after building successfully.
>> >
>> > [1]
>> >
>> https://dev.azure.com/hxbks2ks/FLINK-TEST/_build/results?buildId=1704&view=results
>> >
>> > Best,
>> > Xingbo
>> >
>> > Thomas Weise  于2022年1月10日周一 11:12写道:
>> >
>> > > Hi Martijn,
>> > >
>> > > I started building the release artifacts. The Maven part is ready.
>> > > Currently blocked on the Azure build for the PyFlink wheel packages.
>> > >
>> > > I had to submit a "Azure DevOps Parallelism Request" and that might
>> > > take a couple of days.
>> > >
>> > > Does someone have the steps to build the wheels locally?
>> > > Alternatively, if someone can build them on their existing setup and
>> > > point me to the result, that would speed up things as well.
>> > >
>> > > The release branch:
>> > > https://github.com/apache/flink/tree/release-1.14.3-rc1
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > On Thu, Jan 6, 2022 at 9:14 PM Martijn Visser 
>> > > wrote:
>> > > >
>> > > > Hi Thomas,
>> > > >
>> > > > Thanks for volunteering! There was no volunteer yet, so would be
>> great if
>> > > > you could help out.
>> > > >
>> > > > Best regards,
>> > > >
>> > > > Martijn
>> > > >
>> > > > Op vr 7 jan. 2022 om 01:54 schreef Thomas Weise 
>> > > >
>> > > > > Hi Martijn,
>> > > > >
>> > > > > Thanks for preparing the release. Did a volunteer check in with
>> you?
>> > > > > If not, I would like to take this up.
>> > > > >
>> > > > > Thomas
>> > > > >
>> > > > > On Mon, Dec 27, 2021 at 7:11 AM Martijn Visser <
>> mart...@ververica.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > Thank you all! That means that there's currently no more
>> blocker to
>> > > start
>> > > > > > with the Flink 1.14.3 release.
>> > > > > >
>> > > > > > The only thing that's needed is a committer that's willing to
>> follow
>> > > the
>> > > > > > release process [1] Any volunteers?
>> > > > > >
>> > > > > > Best regards,
>> > > > > >
>> > > > > > Martijn
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
>> > > > > >
>> > > > > > On Mon, 27 Dec 2021 at 03:17, Qingsheng Ren > >
>> > > wrote:
>> > > > > >
>> > > > > > > Hi Martjin,
>> > > > > > >
>> > > > > > > FLINK-25132 has been merged to master and release-1.14.
>> > > > > > >
>> > > > > > > Thanks for your work for releasing 1.14.3!
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > >
>> > > > > > > Qingsheng Ren
>> > > > > > >
>> > > > > > > > On Dec 26, 2021, at 3:46 PM, Konstantin Knauf <
>> kna...@apache.org
>> > > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > Hi Martijn,
>> > > > > > > >
>> > > > > > > > FLINK-25375 is merged to release-1.14.
>> > > > > > > >
>> > > > > > > > Cheers,
>> > > > > > > >
>> > > > > > > > Konstantin
>> > > > > > > >
>> > > > > > > > On Wed, Dec 22, 2021 at 12:02 PM David Morávek <
>> d...@apache.org>
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Hi Martijn, FLINK-25271 has been merged to 1.14 branch.
>> > > > > > > >>
>> > > > > > > >> Best,
>> > > > > > > >> D.
>> > > > > > > >>
>> > > > > > > >> On Wed, Dec 22, 2021 at 7:27 AM 任庆盛 
>> wrote:
>> > > > > > > >>
>> > > > > > > >>> Hi Martjin,
>> > > > > > > >>>
>> > > > > > > >>> Thanks for the effort on Flink 1.14.3. FLINK-25132 has
>> been
>> > > merged
>> > > > > on
>> > > > > > > >>> master and is waiting for CI on release-1.14. I think it
>> can be
>> > > >

[jira] [Created] (FLINK-25651) Flink1.14.2 DataStream Connectors Kafka Deserializer example method uses the wrong parameter

2022-01-13 Thread shouzuo meng (Jira)
shouzuo meng created FLINK-25651:


 Summary: Flink1.14.2 DataStream Connectors Kafka Deserializer 
example method uses the wrong parameter
 Key: FLINK-25651
 URL: https://issues.apache.org/jira/browse/FLINK-25651
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.2
Reporter: shouzuo meng
 Attachments: deserializer.png

The official documentation DataStream Connectors kafka Deserializer module, 
introduces the KafkaRecordDeserializationSchema. ValueOnly, used the wrong 
parameters



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-01-13 Thread Junfan Zhang
Hi G

Thanks for your explain in detail. I have gotten your thoughts, and any way 
this proposal
is a great improvement.

Looking forward to your implementation and i will keep focus on it.
Thanks again.

Best
JunFan.
On Jan 13, 2022, 9:20 PM +0800, Gabor Somogyi , 
wrote:
> Just to confirm keeping "security.kerberos.fetch.delegation-token" is added
> to the doc.
>
> BR,
> G
>
>
> On Thu, Jan 13, 2022 at 1:34 PM Gabor Somogyi 
> wrote:
>
> > Hi JunFan,
> >
> > > By the way, maybe this should be added in the migration plan or
> > intergation section in the FLIP-211.
> >
> > Going to add this soon.
> >
> > > Besides, I have a question that the KDC will collapse when the cluster
> > reached 200 nodes you described
> > in the google doc. Do you have any attachment or reference to prove it?
> >
> > "KDC *may* collapse under some circumstances" is the proper wording.
> >
> > We have several customers who are executing workloads on Spark/Flink. Most
> > of the time I'm facing their
> > daily issues which is heavily environment and use-case dependent. I've
> > seen various cases:
> > * where the mentioned ~1k nodes were working fine
> > * where KDC thought the number of requests are coming from DDOS attack so
> > discontinued authentication
> > * where KDC was simply not responding because of the load
> > * where KDC was intermittently had some outage (this was the most nasty
> > thing)
> >
> > Since you're managing relatively big cluster then you know that KDC is not
> > only used by Spark/Flink workloads
> > but the whole company IT infrastructure is bombing it so it really depends
> > on other factors too whether KDC is reaching
> > it's limit or not. Not sure what kind of evidence are you looking for but
> > I'm not authorized to share any information about
> > our clients data.
> >
> > One thing is for sure. The more external system types are used in
> > workloads (for ex. HDFS, HBase, Hive, Kafka) which
> > are authenticating through KDC the more possibility to reach this
> > threshold when the cluster is big enough.
> >
> > All in all this feature is here to help all users never reach this
> > limitation.
> >
> > BR,
> > G
> >
> >
> > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆  wrote:
> >
> > > Hi G
> > >
> > > Thanks for your quick reply. I think reserving the config of
> > > *security.kerberos.fetch.delegation-token*
> > > and simplifying disable the token fetching is a good idea.By the way,
> > > maybe this should be added
> > > in the migration plan or intergation section in the FLIP-211.
> > >
> > > Besides, I have a question that the KDC will collapse when the cluster
> > > reached 200 nodes you described
> > > in the google doc. Do you have any attachment or reference to prove it?
> > > Because in our internal per-cluster,
> > > the nodes reaches > 1000 and KDC looks good. Do i missed or misunderstood
> > > something? Please correct me.
> > >
> > > Best
> > > JunFan.
> > > On Jan 13, 2022, 5:26 PM +0800, dev@flink.apache.org, wrote:
> > > >
> > > >
> > > https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
> > >
> >


Re: Flink native k8s integration vs. operator

2022-01-13 Thread Chenya Zhang
Thanks Thomas, Gyula, and Marton for driving this effort! It would greatly
ease the adoption of Apache Flink on Kubernetes and help to address the
current operational pain points as mentioned. Look forward to the proposal
and more discussions!

Best,
Chenya

On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
wrote:

> Hi All,
>
> I am pleased to see the level of enthusiasm and technical consideration
> already emerging in this thread. I wholeheartedly support building an
> operator and endorsing it via placing it under the Apache Flink umbrella
> (as a separate repository) as the current lack of it is clearly becoming an
> adoption bottleneck for large scale Flink users. The next logical step is
> to write a FLIP to agree on the technical details, so that we can put
> forward the proposal to the Flink PMC for creating a new repository with a
> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
> initial wording on the proposal which we will put up for public discussion
> in the coming weeks.
>
> Best,
> Marton
>
> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
> wrote:
>
> > Hi Thomas,
> >
> > Yes, I was referring to a separate repository under Apache Flink.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
> >
> >> Hi everyone,
> >>
> >> Thanks for the feedback and discussion. A few additional thoughts:
> >>
> >> [Konstantin] > With respect to common lifecycle management operations:
> >> these features are
> >> > not available (within Apache Flink) for any of the other resource
> >> providers
> >> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> >> this
> >> > a shortcoming of the Kubernetes integration.
> >>
> >> I think time and evolution of the ecosystem are factors to consider as
> >> well. The state and usage of Flink was much different when YARN
> >> integration was novel. Expectations are different today and the
> >> lifecycle functionality provided by an operator may as well be
> >> considered essential to support the concept of a Flink application on
> >> k8s. After few years learning from operator experience outside of
> >> Flink it might be a good time to fill the gap.
> >>
> >> [Konstantin] > I still believe that we should keep this focus on low
> >> > level composable building blocks (like Jobs and Snapshots) in Apache
> >> Flink
> >> > to make it easy for everyone to build fitting higher level
> abstractions
> >> > like a FlinkApplication Custom Resource on top of it.
> >>
> >> I completely agree that it is important that the basic functions of
> >> Flink are solid and continued focus is necessary. Thanks for sharing
> >> the pointers, these are great improvements. At the same time,
> >> ecosystem, contributor base and user spectrum are growing. There have
> >> been significant additions in many areas of Flink including connectors
> >> and higher level abstractions like statefun, SQL and Python. It's also
> >> evident from additional repositories/subprojects that we have in Flink
> >> today.
> >>
> >> [Konstantin] > Having said this, if others in the community have the
> >> capacity to push and
> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
> Apache
> >> > Flink, I don't see any blockers. If or when this happens, I'd see some
> >> > clear benefits of using a separate repository (easier independent
> >> > versioning and releases, different build system & tooling (go, I
> >> assume)).
> >>
> >> Naturally different contributors to the project have different focus.
> >> Let's find out if there is strong enough interest to take this on and
> >> strong enough commitment to maintain. As I see it, there is a
> >> tremendous amount of internal investment going into operationalizing
> >> Flink within many companies. Improvements to the operational side of
> >> Flink like the operator would complement Flink nicely. I assume that
> >> you are referring to a separate repository within Apache Flink, which
> >> would give it the chance to achieve better sustainability than the
> >> existing external operator efforts. There is also the fact that some
> >> organizations which are heavily invested in operationalizing Flink are
> >> allowing contributing to Apache Flink itself but less so to arbitrary
> >> github projects. Regarding the tooling, it could well turn out that
> >> Java is a good alternative given the ecosystem focus and that there is
> >> an opportunity for reuse in certain aspects (metrics, logging etc.).
> >>
> >> [Yang] > I think Xintong has given a strong point why we introduced
> >> the native K8s integration, which is active resource management.
> >> > I have a concrete example for this in the production. When a K8s node
> >> is down, the standalone K8s deployment will take longer
> >> > recovery time based on the K8s eviction time(IIRC, default is 5
> >> minutes). For the native K8s integration, Flink RM could be aware of the
> >> > TM heartbeat lost and allocate a new one

Re: [DISCUSS] FLIP-208: Update KafkaSource to detect EOF based on de-serialized record

2022-01-13 Thread Dong Lin
Hi Qingsheng,

Thank you for extending this FLIP to support this feature with Table/SQL. I
also prefer the 2nd option over the 1st option. This is because the 1st
option requires the user to additionally specify an identifier in addition
to defining the RecordEvaluator themselves. Note that this is unlike
InputFormat, where most users will use off-the-shelf de-serializers in
Flink instead of implementing de-serializers by themselves.

Could you help update FLIP-208 with the proposed changes as well as the
explanation of the rejected alternative?

Thanks,
Dong



On Thu, Jan 13, 2022 at 5:18 PM Qingsheng Ren  wrote:

> Thanks Dong for the explanation!
>
> I agree with Dong’s idea of keeping the consistency of APIs for setting
> configurations, so I think it’s acceptable for me to pass the record
> evaluator from XXXSourceBuilder and embed it into SourceReaderBase. Also
> considering current usage of the DeserializationSchema#isEndOfStream that
> only Kafka source respect this interface, it’s OK to implement just Kafka
> and Pulsar connectors for now.
>
> Another thing I’d like to mention is about using this feature in Table/SQL
> API. Currently I have two kinds of implementations in my mind:
>
> 1. Similar to format / deserializer, we introduce a factory for
> RecordEvaluator, and users need to specify the factory identifier in table
> options:
>
> CREATE TABLE `kafka` (…) WITH (
> `connector` = `kafka`,
> `record.evaluator` =  `my-evaluator-factory-identifier`
> )
>
> 2. Directly use full class path in table options:
>
> CREATE TABLE `kafka` (…) WITH (
> `connector` = `kafka`,
> `record.evaluator.class` =  `com.mycompany.evaluator.MyEvaluator`
> )
>
> Personally I prefer the second one, because it’s easier for users to
> implement their own RecordEvaluators.
>
> What do you think?
>
>
> > On Jan 13, 2022, at 11:39 AM, Dong Lin  wrote:
> >
> > Hi Fabian,
> >
> > Thank you for the explanation.
> >
> > The current approach needs to add new constructors for SourceReaderBase
> > and SingleThreadMultiplexSourceReaderBase. This proposed change has now
> > been included in the Public Interfaces section in the FLIP.
> >
> > And yes, I also agree it would be preferred if developers do not have to
> > change their SourceReaders to implement this new logic. The suggestion
> > mentioned by Qingshen in this thread could achieve this goal. Qingshen's
> > idea is to let user specify eofRecordEvaluator via
> > StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...)
> and
> > pass this evaluator through DataStreamSource to SourceOperator. And
> > SourceOperator::emitNext(...) could use this evaluator as appropriate.
> >
> > For now I have chosen not to use this approach because this approach
> > requires users to pass some source configuration via
> > StreamExecutionEnvironment::fromSource(...) and some other source
> > configuration via e.g. KafkaSourceBuilder(...). This might create a sense
> > of inconsistency/confusion. Given that the number of connector users are
> > much more than the number of connector developers, I believe it is
> probably
> > better to optimize the user experience in this case.
> >
> > The description of this alternative approach and its pros/cons has been
> > included in the FLIP.
> >
> > And yep, I think I understand your suggestion. Indeed those connector
> > configs (e.g. de-serializer, boundedness, eofRecordEvaluator) can be
> passed
> > from XXXSourceBuilder to their shared infra (e.g. SourceReaderBase). Both
> > solutions work. Given that the existing configs (e.g. serializer) are
> > already passed to SourceReaderBase via the constructor parameter, I guess
> > it is simpler to follow the existing pattern for now.
> >
> > Regards,
> > Dong
> >
> >
> > On Wed, Jan 12, 2022 at 11:17 PM Fabian Paul  wrote:
> >
> >> Hi Dong,
> >>
> >> I think I am beginning to understand your idea. Since SourceReaderBase
> >> is marked as PublicEvolving can you also update the FLIP with the
> >> changes you want to make to it? Ideally, connector developers do not
> >> have to change their SourceReaders to implement this new logic.
> >>
> >> My idea was to introduce a second source interface that extends the
> >> existing interface and offers only the method getRecordEvaluator().
> >> The record evaluator is still passed as you have described through the
> >> builder and at the end held by the source object. This way the source
> >> framework can automatically use the evaluator without the need that
> >> connector developers have to implement the complicated stopping logic
> >> or change their SourceReaders.
> >>
> >> Best,
> >> Fabian
> >>
> >>
> >> On Wed, Jan 12, 2022 at 2:22 AM Dong Lin  wrote:
> >>>
> >>> Hi Fabian,
> >>>
> >>> Thanks for the comments. Please see my reply inline.
> >>>
> >>> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul  wrote:
> >>>
>  Hi Dong,
> 
>  I wouldn't change the org.apache.flink.api.connector.source.Source
>  interface because it eit

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Márton Balassi
Hi All,

I am pleased to see the level of enthusiasm and technical consideration
already emerging in this thread. I wholeheartedly support building an
operator and endorsing it via placing it under the Apache Flink umbrella
(as a separate repository) as the current lack of it is clearly becoming an
adoption bottleneck for large scale Flink users. The next logical step is
to write a FLIP to agree on the technical details, so that we can put
forward the proposal to the Flink PMC for creating a new repository with a
clear purpose in mind. I volunteer to work with Thomas and Gyula on the
initial wording on the proposal which we will put up for public discussion
in the coming weeks.

Best,
Marton

On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf  wrote:

> Hi Thomas,
>
> Yes, I was referring to a separate repository under Apache Flink.
>
> Cheers,
>
> Konstantin
>
> On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>
>> Hi everyone,
>>
>> Thanks for the feedback and discussion. A few additional thoughts:
>>
>> [Konstantin] > With respect to common lifecycle management operations:
>> these features are
>> > not available (within Apache Flink) for any of the other resource
>> providers
>> > (YARN, Standalone) either. From this perspective, I wouldn't consider
>> this
>> > a shortcoming of the Kubernetes integration.
>>
>> I think time and evolution of the ecosystem are factors to consider as
>> well. The state and usage of Flink was much different when YARN
>> integration was novel. Expectations are different today and the
>> lifecycle functionality provided by an operator may as well be
>> considered essential to support the concept of a Flink application on
>> k8s. After few years learning from operator experience outside of
>> Flink it might be a good time to fill the gap.
>>
>> [Konstantin] > I still believe that we should keep this focus on low
>> > level composable building blocks (like Jobs and Snapshots) in Apache
>> Flink
>> > to make it easy for everyone to build fitting higher level abstractions
>> > like a FlinkApplication Custom Resource on top of it.
>>
>> I completely agree that it is important that the basic functions of
>> Flink are solid and continued focus is necessary. Thanks for sharing
>> the pointers, these are great improvements. At the same time,
>> ecosystem, contributor base and user spectrum are growing. There have
>> been significant additions in many areas of Flink including connectors
>> and higher level abstractions like statefun, SQL and Python. It's also
>> evident from additional repositories/subprojects that we have in Flink
>> today.
>>
>> [Konstantin] > Having said this, if others in the community have the
>> capacity to push and
>> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
>> > Flink, I don't see any blockers. If or when this happens, I'd see some
>> > clear benefits of using a separate repository (easier independent
>> > versioning and releases, different build system & tooling (go, I
>> assume)).
>>
>> Naturally different contributors to the project have different focus.
>> Let's find out if there is strong enough interest to take this on and
>> strong enough commitment to maintain. As I see it, there is a
>> tremendous amount of internal investment going into operationalizing
>> Flink within many companies. Improvements to the operational side of
>> Flink like the operator would complement Flink nicely. I assume that
>> you are referring to a separate repository within Apache Flink, which
>> would give it the chance to achieve better sustainability than the
>> existing external operator efforts. There is also the fact that some
>> organizations which are heavily invested in operationalizing Flink are
>> allowing contributing to Apache Flink itself but less so to arbitrary
>> github projects. Regarding the tooling, it could well turn out that
>> Java is a good alternative given the ecosystem focus and that there is
>> an opportunity for reuse in certain aspects (metrics, logging etc.).
>>
>> [Yang] > I think Xintong has given a strong point why we introduced
>> the native K8s integration, which is active resource management.
>> > I have a concrete example for this in the production. When a K8s node
>> is down, the standalone K8s deployment will take longer
>> > recovery time based on the K8s eviction time(IIRC, default is 5
>> minutes). For the native K8s integration, Flink RM could be aware of the
>> > TM heartbeat lost and allocate a new one timely.
>>
>> Thanks for sharing this, we should evaluate it as part of a proposal.
>> If we can optimize recovery or scaling with active resource management
>> then perhaps it is worth to support it through the operator.
>> Previously mentioned operators all rely on the standalone model.
>>
>> Cheers,
>> Thomas
>>
>> On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf 
>> wrote:
>> >
>> > cc dev@
>> >
>> > Hi Thomas, Hi everyone,
>> >
>> > Thank you for starting this discussion and sorry for chiming 

Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Maciej Obuchowski
Hello,

I'm an OpenLineage committer - and previously, a minor Flink contributor.
OpenLineage community is very interested in conversation about Flink
metadata, and we'll be happy to cooperate with the Flink community.

Best,
Maciej Obuchowski



czw., 13 sty 2022 o 18:12 Martijn Visser  napisał(a):
>
> Hi all,
>
> @Andrew thanks for sharing that!
>
> @Tero good point, I should have clarified the purpose. I want to understand
> what "metadata platforms" tools are used or evaluated by the Flink
> community, what's their purpose for using such a tool (is it as a generic
> catalogue, as a data discovery tool, is lineage the important part etc) and
> what problems are people trying to solve with them. This space is
> developing rapidly and there are many open source and commercial tools
> popping up/growing, which is also why I'm trying to keep an open vision on
> how this space is evolving.
>
> If the Flink community wants to integrate with metadata tools, I fully
> agree that ideally we do that via standards. My perception is at this
> moment that no clear standard has yet been established. You mentioned
> open-metadata.org, but I believe https://openlineage.io/ is also an
> alternative standard.
>
> Best regards,
>
> Martijn
>
> On Thu, 13 Jan 2022 at 17:00, Tero Paananen  wrote:
>
> > > I'm currently checking out different metadata platforms, such as
> > Amundsen [1] and Datahub [2]. In short, these types of tools try to address
> > problems related to topics such as data discovery, data lineage and an
> > overall data catalogue.
> > >
> > > I'm reaching out to the Dev and User mailing lists to get some feedback.
> > It would really help if you could spend a couple of minutes to let me know
> > if you already use either one of the two mentioned metadata platforms or
> > another one, or are you evaluating such tools? If so, is that for the
> > purpose as a catalogue, for lineage or anything else? Any type of feedback
> > on these types of tools is appreciated.
> >
> > I hope you don't mind answers off-list.
> >
> > You didn't say what purpose you're evaluating these tools for, but if
> > you're evaluating platforms for integration with Flink, I wouldn't
> > approach it with a particular product in mind. Rather I'd create some
> > sort of facility to propagate metadata and/or lineage information in a
> > generic way and allow Flink users to plug in their favorite metadata
> > tool. Using standards like OpenLineage, for example. I believe Egeria
> > is also trying to create an open standard for metadata.;
> >
> > If you're evaluating data catalogs for personal use or use in a
> > particular project, Andrew's answer about the Wikimedia evaluation is
> > a good start. It's missing OpenMetadata (https://open-metadata.org/).
> > That one is showing a LOT of promise. Wikimedia's evaluation is also
> > missing industry leading commercial products (understandably, given
> > their mission). Collibra and Alation probably the ones that pop up
> > most often.
> >
> > I have personally looked into both DataHub and Amundsen. My high level
> > feedback is that DataHub is overengineered, and using proprietary
> > LinkedIn technology platform(s), which aren't widely used anywhere.
> > Amundsen is much less flexible than DataHub and quite basic in its
> > functionality. If you need anything beyond what it already offers,
> > good luck.
> >
> > We dumped Amundsen in favor of OpenMetadata a few months back. We
> > don't have enough data points to fully evaluate OpenMetadata yet.
> >
> > -TPP
> >


Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Martijn Visser
Hi all,

@Andrew thanks for sharing that!

@Tero good point, I should have clarified the purpose. I want to understand
what "metadata platforms" tools are used or evaluated by the Flink
community, what's their purpose for using such a tool (is it as a generic
catalogue, as a data discovery tool, is lineage the important part etc) and
what problems are people trying to solve with them. This space is
developing rapidly and there are many open source and commercial tools
popping up/growing, which is also why I'm trying to keep an open vision on
how this space is evolving.

If the Flink community wants to integrate with metadata tools, I fully
agree that ideally we do that via standards. My perception is at this
moment that no clear standard has yet been established. You mentioned
open-metadata.org, but I believe https://openlineage.io/ is also an
alternative standard.

Best regards,

Martijn

On Thu, 13 Jan 2022 at 17:00, Tero Paananen  wrote:

> > I'm currently checking out different metadata platforms, such as
> Amundsen [1] and Datahub [2]. In short, these types of tools try to address
> problems related to topics such as data discovery, data lineage and an
> overall data catalogue.
> >
> > I'm reaching out to the Dev and User mailing lists to get some feedback.
> It would really help if you could spend a couple of minutes to let me know
> if you already use either one of the two mentioned metadata platforms or
> another one, or are you evaluating such tools? If so, is that for the
> purpose as a catalogue, for lineage or anything else? Any type of feedback
> on these types of tools is appreciated.
>
> I hope you don't mind answers off-list.
>
> You didn't say what purpose you're evaluating these tools for, but if
> you're evaluating platforms for integration with Flink, I wouldn't
> approach it with a particular product in mind. Rather I'd create some
> sort of facility to propagate metadata and/or lineage information in a
> generic way and allow Flink users to plug in their favorite metadata
> tool. Using standards like OpenLineage, for example. I believe Egeria
> is also trying to create an open standard for metadata.;
>
> If you're evaluating data catalogs for personal use or use in a
> particular project, Andrew's answer about the Wikimedia evaluation is
> a good start. It's missing OpenMetadata (https://open-metadata.org/).
> That one is showing a LOT of promise. Wikimedia's evaluation is also
> missing industry leading commercial products (understandably, given
> their mission). Collibra and Alation probably the ones that pop up
> most often.
>
> I have personally looked into both DataHub and Amundsen. My high level
> feedback is that DataHub is overengineered, and using proprietary
> LinkedIn technology platform(s), which aren't widely used anywhere.
> Amundsen is much less flexible than DataHub and quite basic in its
> functionality. If you need anything beyond what it already offers,
> good luck.
>
> We dumped Amundsen in favor of OpenMetadata a few months back. We
> don't have enough data points to fully evaluate OpenMetadata yet.
>
> -TPP
>


Re: [DISCUSS] FLIP-210: Change logging level dynamically at runtime

2022-01-13 Thread Konstantin Knauf
Thanks, Wenhao.

On Thu, Jan 13, 2022 at 4:19 PM Wenhao Ji  wrote:

> It seems that we have reached a consensus that the proposal will not
> be implemented in Flink. I will mark the FLIP as discarded if there
> are no objections.
>
> Thanks, everyone, for joining the discussion again!
>
> Wenhao
>
>
> On Tue, Jan 11, 2022 at 11:12 PM Wenhao Ji  wrote:
> >
> > Hi all,
> >
> > Yes, indeed.
> > After I did some investigation on similar features provided by the
> > Cloud platforms, I actually found several popular Clouds have already
> > offered this.
> >
> > - AWS Kinesis: Setting the Application Logging Level [1], which is
> > implemented by UpdateApplication API [2].
> > - Ververica: Logging & Metrics[3], by changing the template.
> > - Alicloud: Configure job logs [4], which is quite similar to
> > Ververica also by changing the template
> > - Cloudera: Enabling Flink DEBUG logging[5], by changing the
> > Configuration and triggering a restart
> >
> > It looks like this feature is not necessary. It has been developed in
> > one way or another by so many platforms in the ecosystem.
> >
> > [1]:
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/cloudwatch-logs.html#cloudwatch-level
> > [2]:
> https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_UpdateApplication.html
> > [3]: https://docs.ververica.com/platform_operations/logging_metrics.html
> > [4]:
> https://www.alibabacloud.com/help/doc-detail/173646.htm#title-1ay-hju-pka
> > [5]:
> https://docs.cloudera.com/csa/1.6.0/monitoring/topics/csa-ssb-enabling-debug-logging.html
> >
> > Thanks,
> > Wenhao
> >
> > On Tue, Jan 11, 2022 at 8:24 PM Martijn Visser 
> wrote:
> > >
> > > Hi all,
> > >
> > > I agree with Konstantin, this feels like a problem that shouldn't be
> solved
> > > via Apache Flink but via the logging ecosystem itself.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Tue, 11 Jan 2022 at 13:11, Konstantin Knauf 
> wrote:
> > >
> > > > I've now read over the discussion on the ticket, and I am personally
> not in
> > > > favor of adding this functionality to Flink via the REST API or Web
> UI. I
> > > > believe that changing the logging configuration via the existing
> > > > configuration files (log4j or logback) is good enough, to justify not
> > > > increasing the scope of Flink in that direction. As you specifically
> > > > mention YARN: doesn't Cloudera's Hadoop platform, for example, offer
> means
> > > > to manage the configuration files for all worker nodes from a central
> > > > configuration management system? It overall feels like we are trying
> to
> > > > solve a problem in Apache Flink that is already solved in its
> ecosystem and
> > > > increases the scope of the project without adding core value. I also
> expect
> > > > that over time the exposed logging configuration options would
> become more
> > > > and more complex.
> > > >
> > > > I am curious to hear what others think.
> > > >
> > > > On Tue, Jan 11, 2022 at 10:34 AM Chesnay Schepler <
> ches...@apache.org>
> > > > wrote:
> > > >
> > > > > Reloading the config from the filesystem  is already enabled by
> default;
> > > > > that was one of the things that made us switch to Log4j 2.
> > > > >
> > > > > The core point of contention w.r.t. this topic is whether having
> the
> > > > > admin ssh into the machine is too inconvenient.
> > > > >
> > > > > Personally I still think that the the current capabilities are
> > > > > sufficient, and I do not want us to rely on internals of the
> logging
> > > > > backends in production code.
> > > > >
> > > > > On 10/01/2022 17:26, Konstantin Knauf wrote:
> > > > > > Thank you for starting the discussion. Being able to change the
> logging
> > > > > > level at runtime is very valuable in my experience.
> > > > > >
> > > > > > Instead of introducing our own API (and eventually even
> persistence),
> > > > > could
> > > > > > we just periodically reload the log4j or logback configuration
> from the
> > > > > > environment/filesystem? I only quickly googled the topic and
> [1,2]
> > > > > suggest
> > > > > > that this might be possible?
> > > > > >
> > > > > > [1] https://stackoverflow.com/a/16216956/6422562?
> > > > > > [2] https://logback.qos.ch/manual/configuration.html#autoScan
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 10, 2022 at 5:10 PM Wenhao Ji <
> predator@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hi everyone,
> > > > > >>
> > > > > >> Hope you enjoyed the Holiday Season.
> > > > > >>
> > > > > >> I would like to start the discussion on the improvement purpose
> > > > > >> FLIP-210 [1] which aims to provide a way to change log levels at
> > > > > >> runtime to simplify issues and bugs detection as reported in the
> > > > > >> ticket FLINK-16478 [2].
> > > > > >> Firstly, thanks Xingxing Di and xiaodao for their previous
> effort. The
> > > > > >> FLIP I drafted is largely influenced by their previous designs
> [3][4].
> > > > > >> Although we have r

Re: [DISCUSS] Future of Per-Job Mode

2022-01-13 Thread Thomas Weise
Regarding session mode:

## Session Mode
* main() method executed in client

Session mode also supports execution of the main method on Jobmanager
with submission through REST API. That's how Flinkk k8s operators like
[1] work. It's actually an important capability because it allows for
allocation of the cluster resources prior to taking down the previous
job during upgrade when the goal is optimization for availability.

Thanks,
Thomas

[1] https://github.com/lyft/flinkk8soperator

On Thu, Jan 13, 2022 at 12:32 AM Konstantin Knauf  wrote:
>
> Hi everyone,
>
> I would like to discuss and understand if the benefits of having Per-Job
> Mode in Apache Flink outweigh its drawbacks.
>
>
> *# Background: Flink's Deployment Modes*
> Flink currently has three deployment modes. They differ in the following
> dimensions:
> * main() method executed on Jobmanager or Client
> * dependencies shipped by client or bundled with all nodes
> * number of jobs per cluster & relationship between job and cluster
> lifecycle* (supported resource providers)
>
> ## Application Mode
> * main() method executed on Jobmanager
> * dependencies already need to be available on all nodes
> * dedicated cluster for all jobs executed from the same main()-method
> (Note: applications with more than one job, currently still significant
> limitations like missing high-availability). Technically, a session cluster
> dedicated to all jobs submitted from the same main() method.
> * supported by standalone, native kubernetes, YARN
>
> ## Session Mode
> * main() method executed in client
> * dependencies are distributed from and by the client to all nodes
> * cluster is shared by multiple jobs submitted from different clients,
> independent lifecycle
> * supported by standalone, Native Kubernetes, YARN
>
> ## Per-Job Mode
> * main() method executed in client
> * dependencies are distributed from and by the client to all nodes
> * dedicated cluster for a single job
> * supported by YARN only
>
>
> *# Reasons to Keep** There are use cases where you might need the
> combination of a single job per cluster, but main() method execution in the
> client. This combination is only supported by per-job mode.
> * It currently exists. Existing users will need to migrate to either
> session or application mode.
>
>
> *# Reasons to Drop** With Per-Job Mode and Application Mode we have two
> modes that for most users probably do the same thing. Specifically, for
> those users that don't care where the main() method is executed and want to
> submit a single job per cluster. Having two ways to do the same thing is
> confusing.
> * Per-Job Mode is only supported by YARN anyway. If we keep it, we should
> work towards support in Kubernetes and Standalone, too, to reduce special
> casing.
> * Dropping per-job mode would reduce complexity in the code and allow us to
> dedicate more resources to the other two deployment modes.
> * I believe with session mode and application mode we have to easily
> distinguishable and understandable deployment modes that cover Flink's use
> cases:
>* session mode: olap-style, interactive jobs/queries, short lived batch
> jobs, very small jobs, traditional cluster-centric deployment mode (fits
> the "Hadoop world")
>* application mode: long-running streaming jobs, large scale &
> heterogenous jobs (resource isolation!), application-centric deployment
> mode (fits the "Kubernetes world")
>
>
> *# Call to Action*
> * Do you use per-job mode? If so, why & would you be able to migrate to one
> of the other methods?
> * Am I missing any pros/cons?
> * Are you in favor of dropping per-job mode midterm?
>
> Cheers and thank you,
>
> Konstantin
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk


Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Andrew Otto
Hello!  The Wikimedia Foundation is currently doing a similar evaluation
(although we are not currently including any Flink considerations).

https://wikitech.wikimedia.org/wiki/Data_Catalog_Application_Evaluation_Rubric

More details will be published there as folks keep working on this.
Hope that helps a little bit! :)

-Andrew Otto

On Thu, Jan 13, 2022 at 10:27 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I'm currently checking out different metadata platforms, such as Amundsen
> [1] and Datahub [2]. In short, these types of tools try to address problems
> related to topics such as data discovery, data lineage and an overall data
> catalogue.
>
> I'm reaching out to the Dev and User mailing lists to get some feedback.
> It would really help if you could spend a couple of minutes to let me know
> if you already use either one of the two mentioned metadata platforms or
> another one, or are you evaluating such tools? If so, is that for
> the purpose as a catalogue, for lineage or anything else? Any type of
> feedback on these types of tools is appreciated.
>
> Best regards,
>
> Martijn
>
> [1] https://github.com/amundsen-io/amundsen/
> [2] https://github.com/linkedin/datahub
>
>
>


Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Pedro Silva
Hello,

I'm part of the DataHub community and working in collaboration with the
company behind it: http://acryldata.io
Happy to have a conversation or clarify any questions you may have on
DataHub :)

Have a nice day!

Em qui., 13 de jan. de 2022 às 15:33, Andrew Otto 
escreveu:

> Hello!  The Wikimedia Foundation is currently doing a similar evaluation
> (although we are not currently including any Flink considerations).
>
>
> https://wikitech.wikimedia.org/wiki/Data_Catalog_Application_Evaluation_Rubric
>
> More details will be published there as folks keep working on this.
> Hope that helps a little bit! :)
>
> -Andrew Otto
>
> On Thu, Jan 13, 2022 at 10:27 AM Martijn Visser 
> wrote:
>
>> Hi everyone,
>>
>> I'm currently checking out different metadata platforms, such as Amundsen
>> [1] and Datahub [2]. In short, these types of tools try to address problems
>> related to topics such as data discovery, data lineage and an overall data
>> catalogue.
>>
>> I'm reaching out to the Dev and User mailing lists to get some feedback.
>> It would really help if you could spend a couple of minutes to let me know
>> if you already use either one of the two mentioned metadata platforms or
>> another one, or are you evaluating such tools? If so, is that for
>> the purpose as a catalogue, for lineage or anything else? Any type of
>> feedback on these types of tools is appreciated.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] https://github.com/amundsen-io/amundsen/
>> [2] https://github.com/linkedin/datahub
>>
>>
>>


[FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Martijn Visser
Hi everyone,

I'm currently checking out different metadata platforms, such as Amundsen
[1] and Datahub [2]. In short, these types of tools try to address problems
related to topics such as data discovery, data lineage and an overall data
catalogue.

I'm reaching out to the Dev and User mailing lists to get some feedback. It
would really help if you could spend a couple of minutes to let me know if
you already use either one of the two mentioned metadata platforms or
another one, or are you evaluating such tools? If so, is that for
the purpose as a catalogue, for lineage or anything else? Any type of
feedback on these types of tools is appreciated.

Best regards,

Martijn

[1] https://github.com/amundsen-io/amundsen/
[2] https://github.com/linkedin/datahub


Re: [DISCUSS] FLIP-210: Change logging level dynamically at runtime

2022-01-13 Thread Wenhao Ji
It seems that we have reached a consensus that the proposal will not
be implemented in Flink. I will mark the FLIP as discarded if there
are no objections.

Thanks, everyone, for joining the discussion again!

Wenhao


On Tue, Jan 11, 2022 at 11:12 PM Wenhao Ji  wrote:
>
> Hi all,
>
> Yes, indeed.
> After I did some investigation on similar features provided by the
> Cloud platforms, I actually found several popular Clouds have already
> offered this.
>
> - AWS Kinesis: Setting the Application Logging Level [1], which is
> implemented by UpdateApplication API [2].
> - Ververica: Logging & Metrics[3], by changing the template.
> - Alicloud: Configure job logs [4], which is quite similar to
> Ververica also by changing the template
> - Cloudera: Enabling Flink DEBUG logging[5], by changing the
> Configuration and triggering a restart
>
> It looks like this feature is not necessary. It has been developed in
> one way or another by so many platforms in the ecosystem.
>
> [1]: 
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/cloudwatch-logs.html#cloudwatch-level
> [2]: 
> https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_UpdateApplication.html
> [3]: https://docs.ververica.com/platform_operations/logging_metrics.html
> [4]: https://www.alibabacloud.com/help/doc-detail/173646.htm#title-1ay-hju-pka
> [5]: 
> https://docs.cloudera.com/csa/1.6.0/monitoring/topics/csa-ssb-enabling-debug-logging.html
>
> Thanks,
> Wenhao
>
> On Tue, Jan 11, 2022 at 8:24 PM Martijn Visser  wrote:
> >
> > Hi all,
> >
> > I agree with Konstantin, this feels like a problem that shouldn't be solved
> > via Apache Flink but via the logging ecosystem itself.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 11 Jan 2022 at 13:11, Konstantin Knauf  wrote:
> >
> > > I've now read over the discussion on the ticket, and I am personally not 
> > > in
> > > favor of adding this functionality to Flink via the REST API or Web UI. I
> > > believe that changing the logging configuration via the existing
> > > configuration files (log4j or logback) is good enough, to justify not
> > > increasing the scope of Flink in that direction. As you specifically
> > > mention YARN: doesn't Cloudera's Hadoop platform, for example, offer means
> > > to manage the configuration files for all worker nodes from a central
> > > configuration management system? It overall feels like we are trying to
> > > solve a problem in Apache Flink that is already solved in its ecosystem 
> > > and
> > > increases the scope of the project without adding core value. I also 
> > > expect
> > > that over time the exposed logging configuration options would become more
> > > and more complex.
> > >
> > > I am curious to hear what others think.
> > >
> > > On Tue, Jan 11, 2022 at 10:34 AM Chesnay Schepler 
> > > wrote:
> > >
> > > > Reloading the config from the filesystem  is already enabled by default;
> > > > that was one of the things that made us switch to Log4j 2.
> > > >
> > > > The core point of contention w.r.t. this topic is whether having the
> > > > admin ssh into the machine is too inconvenient.
> > > >
> > > > Personally I still think that the the current capabilities are
> > > > sufficient, and I do not want us to rely on internals of the logging
> > > > backends in production code.
> > > >
> > > > On 10/01/2022 17:26, Konstantin Knauf wrote:
> > > > > Thank you for starting the discussion. Being able to change the 
> > > > > logging
> > > > > level at runtime is very valuable in my experience.
> > > > >
> > > > > Instead of introducing our own API (and eventually even persistence),
> > > > could
> > > > > we just periodically reload the log4j or logback configuration from 
> > > > > the
> > > > > environment/filesystem? I only quickly googled the topic and [1,2]
> > > > suggest
> > > > > that this might be possible?
> > > > >
> > > > > [1] https://stackoverflow.com/a/16216956/6422562?
> > > > > [2] https://logback.qos.ch/manual/configuration.html#autoScan
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jan 10, 2022 at 5:10 PM Wenhao Ji 
> > > > wrote:
> > > > >
> > > > >> Hi everyone,
> > > > >>
> > > > >> Hope you enjoyed the Holiday Season.
> > > > >>
> > > > >> I would like to start the discussion on the improvement purpose
> > > > >> FLIP-210 [1] which aims to provide a way to change log levels at
> > > > >> runtime to simplify issues and bugs detection as reported in the
> > > > >> ticket FLINK-16478 [2].
> > > > >> Firstly, thanks Xingxing Di and xiaodao for their previous effort. 
> > > > >> The
> > > > >> FLIP I drafted is largely influenced by their previous designs 
> > > > >> [3][4].
> > > > >> Although we have reached some agreements under the jira comments 
> > > > >> about
> > > > >> the scope of this feature, we still have the following questions
> > > > >> listed below ready to be discussed in this thread.
> > > > >>
> > > > >> ## Question 1
> > > > >>
> > > > >>> Creating as custom DSL and implementing it f

[jira] [Created] (FLINK-25650) Document unaligned checkpoints performance limitations (larger records/flat map/timers/...)

2022-01-13 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25650:
-

 Summary: Document unaligned checkpoints performance limitations 
(larger records/flat map/timers/...)
 Key: FLINK-25650
 URL: https://issues.apache.org/jira/browse/FLINK-25650
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


The unaligned checkpoint can be delayed if the current record is consumed too 
long(because it is too large or it is the flat map etc.). Which can be pretty 
confused. So it makes sense to document this limitation to give the user 
understanding of this situation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25649) Scheduling jobs fails with org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException

2022-01-13 Thread Gil De Grove (Jira)
Gil De Grove created FLINK-25649:


 Summary: Scheduling jobs fails with 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
 Key: FLINK-25649
 URL: https://issues.apache.org/jira/browse/FLINK-25649
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.1
Reporter: Gil De Grove


Following comment from Till on this [SO 
question|https://stackoverflow.com/questions/70683048/scheduling-jobs-fails-with-org-apache-flink-runtime-jobmanager-scheduler-noresou?noredirect=1#comment124980546_70683048]
h2. *Summary*

We are currently experiencing a scheduling issue with our flink cluster.

The symptoms are that some/most/all (it depend, the symptoms are not always the 
same) of our tasks are showed as _SCHEDULED_ but fail after a timeout. The jobs 
are them showed a _RUNNING_

The failing exception is the following one:

{{Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout}}

After analysis, we assume (we cannot prove it, as there are not that much logs 
for that part of the code) that the failure is due to a deadlock/race condition 
that is happening when several jobs are being submitted at the same time to the 
flink cluster, even though we have enough slots available in the cluster.

We actually have the error with 52 available task slots, and have 12 jobs that 
are not scheduled.
h2. Additional information
 * Flink version: 1.13.1 commit a7f3192
 * Flink cluster in session mode
 * 2 Job managers using k8s HA mode (resource requests: 2 CPU, 4Gb Ram, limits 
sets on memory to 4Gb)
 * 50 task managers with 2 slots each (resource requests: 2 CPUs, 2GB Ram. No 
limits set).
 * Our Flink cluster is shut down every night, and restarted every morning. The 
error seems to occur when a lot of jobs needs to be scheduled. The jobs are 
configured to restore their state, and we do not see any issues for jobs that 
are being scheduled and run correctly, it seems to really be related to a 
scheduling issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-01-13 Thread Gabor Somogyi
Just to confirm keeping "security.kerberos.fetch.delegation-token" is added
to the doc.

BR,
G


On Thu, Jan 13, 2022 at 1:34 PM Gabor Somogyi 
wrote:

> Hi  JunFan,
>
> > By the way, maybe this should be added in the migration plan or
> intergation section in the FLIP-211.
>
> Going to add this soon.
>
> > Besides, I have a question that the KDC will collapse when the cluster
> reached 200 nodes you described
> in the google doc. Do you have any attachment or reference to prove it?
>
> "KDC *may* collapse under some circumstances" is the proper wording.
>
> We have several customers who are executing workloads on Spark/Flink. Most
> of the time I'm facing their
> daily issues which is heavily environment and use-case dependent. I've
> seen various cases:
> * where the mentioned ~1k nodes were working fine
> * where KDC thought the number of requests are coming from DDOS attack so
> discontinued authentication
> * where KDC was simply not responding because of the load
> * where KDC was intermittently had some outage (this was the most nasty
> thing)
>
> Since you're managing relatively big cluster then you know that KDC is not
> only used by Spark/Flink workloads
> but the whole company IT infrastructure is bombing it so it really depends
> on other factors too whether KDC is reaching
> it's limit or not. Not sure what kind of evidence are you looking for but
> I'm not authorized to share any information about
> our clients data.
>
> One thing is for sure. The more external system types are used in
> workloads (for ex. HDFS, HBase, Hive, Kafka) which
> are authenticating through KDC the more possibility to reach this
> threshold when the cluster is big enough.
>
> All in all this feature is here to help all users never reach this
> limitation.
>
> BR,
> G
>
>
> On Thu, Jan 13, 2022 at 1:00 PM 张俊帆  wrote:
>
>> Hi G
>>
>> Thanks for your quick reply. I think reserving the config of
>> *security.kerberos.fetch.delegation-token*
>> and simplifying disable the token fetching is a good idea.By the way,
>> maybe this should be added
>> in the migration plan or intergation section in the FLIP-211.
>>
>> Besides, I have a question that the KDC will collapse when the cluster
>> reached 200 nodes you described
>> in the google doc. Do you have any attachment or reference to prove it?
>> Because in our internal per-cluster,
>> the nodes reaches > 1000 and KDC looks good. Do i missed or misunderstood
>> something? Please correct me.
>>
>> Best
>> JunFan.
>> On Jan 13, 2022, 5:26 PM +0800, dev@flink.apache.org, wrote:
>> >
>> >
>> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
>>
>


[jira] [Created] (FLINK-25648) Redundant to querying deployment when creating task manager pod

2022-01-13 Thread Yuan Huang (Jira)
Yuan Huang  created FLINK-25648:
---

 Summary: Redundant to querying deployment when creating task 
manager pod
 Key: FLINK-25648
 URL: https://issues.apache.org/jira/browse/FLINK-25648
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.14.2
Reporter: Yuan Huang 
 Attachments: trace.png

When creating Task Manager (TM) Pod, the fabric client needs the deployment to 
set owner reference for the TM pod. However, repeating to querying the 
deployment each time is unnecessary which will waste a lot of time.

According to the measured result by 
https://arthas.aliyun.com/doc/trace.html#id1) below: Quering deployment may 
cost more than 50% (203/349 ms) of time.

Can we save the deployment reference in memory instead of querying it each time 
to save pod creating time?

!trace.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25647) Improve JSON format handling and support

2022-01-13 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25647:
--

 Summary: Improve JSON format handling and support
 Key: FLINK-25647
 URL: https://issues.apache.org/jira/browse/FLINK-25647
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Martijn Visser


This is an umbrella issue to track improvements how Flink supports and handles 
the JSON format.

There are no sub-tasks yet. If you think there's already an existing ticket 
that should be considered, please let me know so it can be linked/converted to 
a subtask. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-01-13 Thread Gabor Somogyi
Hi  JunFan,

> By the way, maybe this should be added in the migration plan or
intergation section in the FLIP-211.

Going to add this soon.

> Besides, I have a question that the KDC will collapse when the cluster
reached 200 nodes you described
in the google doc. Do you have any attachment or reference to prove it?

"KDC *may* collapse under some circumstances" is the proper wording.

We have several customers who are executing workloads on Spark/Flink. Most
of the time I'm facing their
daily issues which is heavily environment and use-case dependent. I've seen
various cases:
* where the mentioned ~1k nodes were working fine
* where KDC thought the number of requests are coming from DDOS attack so
discontinued authentication
* where KDC was simply not responding because of the load
* where KDC was intermittently had some outage (this was the most nasty
thing)

Since you're managing relatively big cluster then you know that KDC is not
only used by Spark/Flink workloads
but the whole company IT infrastructure is bombing it so it really depends
on other factors too whether KDC is reaching
it's limit or not. Not sure what kind of evidence are you looking for but
I'm not authorized to share any information about
our clients data.

One thing is for sure. The more external system types are used in workloads
(for ex. HDFS, HBase, Hive, Kafka) which
are authenticating through KDC the more possibility to reach this threshold
when the cluster is big enough.

All in all this feature is here to help all users never reach this
limitation.

BR,
G


On Thu, Jan 13, 2022 at 1:00 PM 张俊帆  wrote:

> Hi G
>
> Thanks for your quick reply. I think reserving the config of
> *security.kerberos.fetch.delegation-token*
> and simplifying disable the token fetching is a good idea.By the way,
> maybe this should be added
> in the migration plan or intergation section in the FLIP-211.
>
> Besides, I have a question that the KDC will collapse when the cluster
> reached 200 nodes you described
> in the google doc. Do you have any attachment or reference to prove it?
> Because in our internal per-cluster,
> the nodes reaches > 1000 and KDC looks good. Do i missed or misunderstood
> something? Please correct me.
>
> Best
> JunFan.
> On Jan 13, 2022, 5:26 PM +0800, dev@flink.apache.org, wrote:
> >
> >
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
>


[jira] [Created] (FLINK-25646) Document buffer debloating issues with high parallelism

2022-01-13 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25646:
-

 Summary: Document buffer debloating issues with high parallelism
 Key: FLINK-25646
 URL: https://issues.apache.org/jira/browse/FLINK-25646
 Project: Flink
  Issue Type: Improvement
Reporter: Anton Kalashnikov


According to last benchmarks, there are some problems with buffer debloat when 
job has high parallelism. The high parallelism means the different value from 
job to job but in general it is more than 200. So it makes sense to document 
that problem and propose the solution - increasing the number of buffers.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-01-13 Thread 张俊帆
Hi G

Thanks for your quick reply. I think reserving the config of 
*security.kerberos.fetch.delegation-token*
and simplifying disable the token fetching is a good idea.By the way, maybe 
this should be added
in the migration plan or intergation section in the FLIP-211.

Besides, I have a question that the KDC will collapse when the cluster reached 
200 nodes you described
in the google doc. Do you have any attachment or reference to prove it? Because 
in our internal per-cluster,
the nodes reaches > 1000 and KDC looks good. Do i missed or misunderstood 
something? Please correct me.

Best
JunFan.
On Jan 13, 2022, 5:26 PM +0800, dev@flink.apache.org, wrote:
>
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ


Re: [DISCUSS] FLIP-203: Incremental savepoints

2022-01-13 Thread Piotr Nowojski
Hi,

Thanks for the comments and questions. Starting from the top:

Seth: good point about schema evolution. Actually, I have a very similar
question to State Processor API. Is it the same scenario in this case?
Should it also be working with checkpoints but might be just untested?

And next question, should we commit to supporting those two things (State
Processor API and schema evolution) for native savepoints? What about
aligned checkpoints? (please check [1] for that).

Yu Li: 1, 2 and 4 done.

> 3. How about changing the description of "the default configuration of the
> checkpoints will be used to determine whether the savepoint should be
> incremental or not" to something like "the `state.backend.incremental`
> setting now denotes the type of native format snapshot and will take
effect
> for both checkpoint and savepoint (with native type)", to prevent concept
> confusion between checkpoint and savepoint?

Is `state.backend.incremental` the only configuration parameter that can be
used in this context? I would guess not? What about for example
"state.storage.fs.memory-threshold" or all of the Advanced RocksDB State
Backends Options [2]?

David:

> does this mean that we need to keep the checkpoints compatible across
minor
> versions? Or can we say, that the minor version upgrades are only
> guaranteed with canonical savepoints?

Good question. Frankly I was always assuming that this is implicitly given.
Otherwise users would not be able to recover jobs that are failing because
of bugs in Flink. But I'm pretty sure that was never explicitly stated.

As Konstantin suggested, I've written down the pre-existing guarantees of
checkpoints and savepoints followed by two proposals on how they should be
changed [1]. Could you take a look?

I'm especially unsure about the following things:
a) What about RocksDB upgrades? If we bump RocksDB version between Flink
versions, do we support recovering from a native format snapshot
(incremental checkpoint)?
b) State Processor API - both pre-existing and what do we want to provide
in the future
c) Schema Evolution - both pre-existing and what do we want to provide in
the future

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Checkpointvssavepointguarantees
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-rocksdb-state-backends-options

wt., 11 sty 2022 o 09:45 Konstantin Knauf  napisał(a):

> Hi Piotr,
>
> would it be possible to provide a table that shows the
> compatibility guarantees provided by the different snapshots going forward?
> Like type of change (Topology. State Schema, Parallelism, ..) in one
> dimension, and type of snapshot as the other dimension. Based on that, it
> would be easier to discuss those guarantees, I believe.
>
> Cheers,
>
> Konstantin
>
> On Mon, Jan 3, 2022 at 9:11 AM David Morávek  wrote:
>
> > Hi Piotr,
> >
> > does this mean that we need to keep the checkpoints compatible across
> minor
> > versions? Or can we say, that the minor version upgrades are only
> > guaranteed with canonical savepoints?
> >
> > My concern is especially if we'd want to change layout of the checkpoint.
> >
> > D.
> >
> >
> >
> > On Wed, Dec 29, 2021 at 5:19 AM Yu Li  wrote:
> >
> > > Thanks for the proposal Piotr! Overall I'm +1 for the idea, and below
> are
> > > my two cents:
> > >
> > > 1. How about adding a "Term Definition" section and clarify what
> "native
> > > format" (the "native" data persistence format of the current state
> > backend)
> > > and "canonical format" (the "uniform" format that supports switching
> > state
> > > backends) means?
> > >
> > > 2. IIUC, currently the FLIP proposes to only support incremental
> > savepoint
> > > with native format, and there's no plan to add such support for
> canonical
> > > format, right? If so, how about writing this down explicitly in the
> FLIP
> > > doc, maybe in a "Limitations" section, plus the fact that
> > > `HashMapStateBackend` cannot support incremental savepoint before
> > FLIP-151
> > > is done? (side note: @Roman just a kindly reminder, that please take
> > > FLIP-203 into account when implementing FLIP-151)
> > >
> > > 3. How about changing the description of "the default configuration of
> > the
> > > checkpoints will be used to determine whether the savepoint should be
> > > incremental or not" to something like "the `state.backend.incremental`
> > > setting now denotes the type of native format snapshot and will take
> > effect
> > > for both checkpoint and savepoint (with native type)", to prevent
> concept
> > > confusion between checkpoint and savepoint?
> > >
> > > 4. How about putting the notes of behavior change (the default type of
> > > savepoint will be changed to `native` in the future, and by then the
> > taken
> > > savepoint cannot be used to switch state backends by default) to a more
> > > obvious place, for example moving from the "CLI" section 

Re: [VOTE] Release 1.14.3, release candidate #1

2022-01-13 Thread Xingbo Huang
+1 (non-binding)

- Verified checksums and signatures
- Verified Python wheel package contents
- Pip install apache-flink-libraries source package and apache-flink wheel
package in Mac
- Run the examples from Python Table API Tutorial[1] in Python REPL

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table_api_tutorial/

Best,
Xingbo

Jingsong Li  于2022年1月13日周四 10:05写道:

> +1 (non-binding)
>
> - Verified checksums and signatures
> - Build from source
> - Start a standalone cluster, web is OK
> - Start sql-client, everything looks good
>
> Best,
> Jingsong
>
> On Wed, Jan 12, 2022 at 11:00 PM Till Rohrmann 
> wrote:
> >
> > +1 (binding)
> >
> > - Verified checksums and signatures
> > - Build from source and ran StateMachineExample
> > - Reviewed the flink-web PR
> > - Verified that there were no other dependency changes than
> testcontainer,
> > japicmp-plugin and log4j.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jan 12, 2022 at 9:58 AM Yun Tang  wrote:
> >
> > > +1 (non-binding)
> > >
> > >
> > >   *   Checked the signature of source code, some of binaries and some
> of
> > > python packages.
> > >   *   Launch a local cluster successfully on linux machine with correct
> > > flink-version and commit id and run the state machine example
> successfully
> > > as expected.
> > >   *   Reviewed the flink-web PR.
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Martijn Visser 
> > > Sent: Wednesday, January 12, 2022 0:34
> > > To: dev 
> > > Subject: [VOTE] Release 1.14.3, release candidate #1
> > >
> > > Hi everyone,
> > > Please review and vote on the release candidate #1 for the version
> > > 1.14.3, as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release and binary convenience releases to
> > > be deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint 12DEE3E4D920A98C [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag "release-1.14.3-rc1" [5],
> > > * website pull request listing the new release and adding announcement
> > > blog post [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks on behalf of Thomas Weise and myself,
> > >
> > > Martijn Visser
> > > http://twitter.com/MartijnVisser82
> > >
> > > [1]
> > >
> > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351075&projectId=12315522
> > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.14.3-rc1/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1481/
> > > [5] https://dist.apache.org/repos/dist/dev/flink/flink-1.14.3-rc1/
> > > [6] https://github.com/apache/flink-web/pull/497
> > >
>


[jira] [Created] (FLINK-25645) UnsupportedOperationException would thrown out when hash shuffle by a field with array type

2022-01-13 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25645:
--

 Summary: UnsupportedOperationException would thrown out when hash 
shuffle by a field with array type
 Key: FLINK-25645
 URL: https://issues.apache.org/jira/browse/FLINK-25645
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang
 Attachments: image-2022-01-13-19-12-40-756.png, 
image-2022-01-13-19-15-28-395.png

Currently array type is not supported as hash shuffle key because CodeGen does 
not support it yet.
 !image-2022-01-13-19-15-28-395.png! 

An unsupportedOperationException would thrown out when hash shuffle by a field 
with array type,
 !image-2022-01-13-19-12-40-756.png! 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25644) Introduce interfaces between file-table-store and flink connector sink

2022-01-13 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-25644:


 Summary: Introduce interfaces between file-table-store and flink 
connector sink
 Key: FLINK-25644
 URL: https://issues.apache.org/jira/browse/FLINK-25644
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.1.0


We should introduce a FileStore interface to provide operations:
 * provides FileStoreWrite: Write operation which provides RecordWriter 
creation.
 * provides FileStoreCommit: Commit operation which provides commit and 
overwrite.
 * provides FileStoreExpire: Expire operation which provides snapshots expire.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25643) Introduce Predicate to table store

2022-01-13 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-25643:


 Summary: Introduce Predicate to table store
 Key: FLINK-25643
 URL: https://issues.apache.org/jira/browse/FLINK-25643
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.1.0


Flink Expression is not serializable. Although it has asSerializableString, the 
method is not implemented by all Expressions, and the deserialization requires 
many parameters.

So table store introduces Predicate to be a serializable class to do filter and 
partition push down.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-01-13 Thread Gabor Somogyi
Hi Junfan,

Thanks for investing your time to make this feature better.
I've had a look at FLINK-21700 and now I think I see your point (plz
correct me if I misunderstood something).

According to the actual plans *security.kerberos.fetch.delegation-token* is
intended to be removed
because *security.kerberos.tokens.${provider}.enabled* would provide more
fine grained possibilities.
However this would be not super convenient from oozie perspective because
one must know all
available token provider names (which may change over time) to turn all
off. If I understand the problem well
then the mentioned use-case justifies not to remove
*security.kerberos.fetch.delegation-token*.

I tend to agree to keep the global flag and simplifying external token
handling use-case from config perspective.

Waiting on your opinion...

BR,
G


On Thu, Jan 13, 2022 at 3:42 AM 张俊帆  wrote:

> Hi G,
>
> Thanks for starting the discussion. I think this is a important
> improvement for Flink.
> The proposal looks good to me. And I focus on one point.
>
> 1. Hope that keeping the consistent with current implementation, we rely
> on the config
> of  'security.kerberos.fetch.delegation-token’ to submit Flink Batch
> Action in Oozie.
> More details could be found in FLINK-21700
>
> Looking forward to your implementations.
>
> Best
> JunFan.
> On Jan 12, 2022, 4:03 AM +0800, Márton Balassi ,
> wrote:
> > Hi G,
> >
> > Thanks for taking this challenge on. Scalable Kerberos authentication
> > support is important for Flink, delegation tokens is a great mechanism to
> > future-proof this. I second your assessment that the existing
> > implementation could use some improvement too and like the approach you
> > have outlined. It is crucial that the changes are self-contained and will
> > not affect users that do not use Kerberos, while are minimal for the ones
> > who do (configuration values change, but the defaults just keep working
> in
> > most cases).
> >
> > Thanks,
> > Marton
> >
> > On Tue, Jan 11, 2022 at 2:59 PM Gabor Somogyi  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Hope all of you have enjoyed the holiday season.
> > >
> > > I would like to start the discussion on FLIP-211
> > > <
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
> > > >
> > > which
> > > aims to provide a
> > > Kerberos delegation token framework that /obtains/renews/distributes
> tokens
> > > out-of-the-box.
> > >
> > > Please be aware that the FLIP wiki area is not fully done since the
> > > discussion may
> > > change the feature in major ways. The proposal can be found in a
> google doc
> > > here
> > > <
> > >
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
> > > >
> > > .
> > > As the community agrees on the approach the content will be moved to
> the
> > > wiki page.
> > >
> > > Feel free to add your thoughts to make this feature better!
> > >
> > > BR,
> > > G
> > >
>


Re: [DISCUSS] FLIP-208: Update KafkaSource to detect EOF based on de-serialized record

2022-01-13 Thread Qingsheng Ren
Thanks Dong for the explanation!

I agree with Dong’s idea of keeping the consistency of APIs for setting 
configurations, so I think it’s acceptable for me to pass the record evaluator 
from XXXSourceBuilder and embed it into SourceReaderBase. Also considering 
current usage of the DeserializationSchema#isEndOfStream that only Kafka source 
respect this interface, it’s OK to implement just Kafka and Pulsar connectors 
for now. 

Another thing I’d like to mention is about using this feature in Table/SQL API. 
Currently I have two kinds of implementations in my mind: 

1. Similar to format / deserializer, we introduce a factory for 
RecordEvaluator, and users need to specify the factory identifier in table 
options: 

CREATE TABLE `kafka` (…) WITH (
`connector` = `kafka`,
`record.evaluator` =  `my-evaluator-factory-identifier`
)

2. Directly use full class path in table options:

CREATE TABLE `kafka` (…) WITH (
`connector` = `kafka`,
`record.evaluator.class` =  `com.mycompany.evaluator.MyEvaluator`
)

Personally I prefer the second one, because it’s easier for users to implement 
their own RecordEvaluators.

What do you think?


> On Jan 13, 2022, at 11:39 AM, Dong Lin  wrote:
> 
> Hi Fabian,
> 
> Thank you for the explanation.
> 
> The current approach needs to add new constructors for SourceReaderBase
> and SingleThreadMultiplexSourceReaderBase. This proposed change has now
> been included in the Public Interfaces section in the FLIP.
> 
> And yes, I also agree it would be preferred if developers do not have to
> change their SourceReaders to implement this new logic. The suggestion
> mentioned by Qingshen in this thread could achieve this goal. Qingshen's
> idea is to let user specify eofRecordEvaluator via
> StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...) and
> pass this evaluator through DataStreamSource to SourceOperator. And
> SourceOperator::emitNext(...) could use this evaluator as appropriate.
> 
> For now I have chosen not to use this approach because this approach
> requires users to pass some source configuration via
> StreamExecutionEnvironment::fromSource(...) and some other source
> configuration via e.g. KafkaSourceBuilder(...). This might create a sense
> of inconsistency/confusion. Given that the number of connector users are
> much more than the number of connector developers, I believe it is probably
> better to optimize the user experience in this case.
> 
> The description of this alternative approach and its pros/cons has been
> included in the FLIP.
> 
> And yep, I think I understand your suggestion. Indeed those connector
> configs (e.g. de-serializer, boundedness, eofRecordEvaluator) can be passed
> from XXXSourceBuilder to their shared infra (e.g. SourceReaderBase). Both
> solutions work. Given that the existing configs (e.g. serializer) are
> already passed to SourceReaderBase via the constructor parameter, I guess
> it is simpler to follow the existing pattern for now.
> 
> Regards,
> Dong
> 
> 
> On Wed, Jan 12, 2022 at 11:17 PM Fabian Paul  wrote:
> 
>> Hi Dong,
>> 
>> I think I am beginning to understand your idea. Since SourceReaderBase
>> is marked as PublicEvolving can you also update the FLIP with the
>> changes you want to make to it? Ideally, connector developers do not
>> have to change their SourceReaders to implement this new logic.
>> 
>> My idea was to introduce a second source interface that extends the
>> existing interface and offers only the method getRecordEvaluator().
>> The record evaluator is still passed as you have described through the
>> builder and at the end held by the source object. This way the source
>> framework can automatically use the evaluator without the need that
>> connector developers have to implement the complicated stopping logic
>> or change their SourceReaders.
>> 
>> Best,
>> Fabian
>> 
>> 
>> On Wed, Jan 12, 2022 at 2:22 AM Dong Lin  wrote:
>>> 
>>> Hi Fabian,
>>> 
>>> Thanks for the comments. Please see my reply inline.
>>> 
>>> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul  wrote:
>>> 
 Hi Dong,
 
 I wouldn't change the org.apache.flink.api.connector.source.Source
 interface because it either breaks existing sinks or we introduce it
 as some kind of optional. I deem both options as not great. My idea is
 to introduce a new interface that extends the Source. This way users
 who want to develop a source that stops with the record evaluator can
 implement the new interface. It also has the nice benefit that we can
 give this new type of source a lower stability guarantee than Public
 to allow some changes.
 
>>> 
>>> Currently the eofRecodEvaluator can be passed from
>>> KafkaSourceBuilder/PulsarSourceBuilder
>>> to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This
>>> approach also allows developers who want to develop a source that stops
>>> with the record evaluator to implement the new feature. Adding a new
>>> interface could incr

Re: [DISCUSS] Future of Per-Job Mode

2022-01-13 Thread Biao Geng
Hi Konstantin,

Thanks a lot for starting this discussion! I hope my thoughts and
experiences why users use Per-Job Mode, especially in YARN can help:
#1. Per-job mode makes managing dependencies easier: I have met some
customers who used Per-Job Mode to submit jobs with a lot of local
user-defined jars by using '-C' option directly. They do not need to upload
these jars to some remote file system(e.g. HDFS) first, which makes their
life easier.
#2. In YARN mode, currently, there are some limitations of Application Mode:
in this jira(https://issues.apache.org/jira/browse/FLINK-24897) that I am
working on, we find that YARN Application Mode do not support `usrlib` very
well, which makes it hard to use FlinkUserCodeClassLoader to load classes
in user-defined jars.

I believe above 2 points, especially #2, can be reassured as we enhance the
YARN Application Mode later but I think it is worthwhile to consider
dependency management more carefully before we make decisions.

Best,
Biao Geng


Konstantin Knauf  于2022年1月13日周四 16:32写道:

> Hi everyone,
>
> I would like to discuss and understand if the benefits of having Per-Job
> Mode in Apache Flink outweigh its drawbacks.
>
>
> *# Background: Flink's Deployment Modes*
> Flink currently has three deployment modes. They differ in the following
> dimensions:
> * main() method executed on Jobmanager or Client
> * dependencies shipped by client or bundled with all nodes
> * number of jobs per cluster & relationship between job and cluster
> lifecycle* (supported resource providers)
>
> ## Application Mode
> * main() method executed on Jobmanager
> * dependencies already need to be available on all nodes
> * dedicated cluster for all jobs executed from the same main()-method
> (Note: applications with more than one job, currently still significant
> limitations like missing high-availability). Technically, a session cluster
> dedicated to all jobs submitted from the same main() method.
> * supported by standalone, native kubernetes, YARN
>
> ## Session Mode
> * main() method executed in client
> * dependencies are distributed from and by the client to all nodes
> * cluster is shared by multiple jobs submitted from different clients,
> independent lifecycle
> * supported by standalone, Native Kubernetes, YARN
>
> ## Per-Job Mode
> * main() method executed in client
> * dependencies are distributed from and by the client to all nodes
> * dedicated cluster for a single job
> * supported by YARN only
>
>
> *# Reasons to Keep** There are use cases where you might need the
> combination of a single job per cluster, but main() method execution in the
> client. This combination is only supported by per-job mode.
> * It currently exists. Existing users will need to migrate to either
> session or application mode.
>
>
> *# Reasons to Drop** With Per-Job Mode and Application Mode we have two
> modes that for most users probably do the same thing. Specifically, for
> those users that don't care where the main() method is executed and want to
> submit a single job per cluster. Having two ways to do the same thing is
> confusing.
> * Per-Job Mode is only supported by YARN anyway. If we keep it, we should
> work towards support in Kubernetes and Standalone, too, to reduce special
> casing.
> * Dropping per-job mode would reduce complexity in the code and allow us
> to dedicate more resources to the other two deployment modes.
> * I believe with session mode and application mode we have to easily
> distinguishable and understandable deployment modes that cover Flink's use
> cases:
>* session mode: olap-style, interactive jobs/queries, short lived batch
> jobs, very small jobs, traditional cluster-centric deployment mode (fits
> the "Hadoop world")
>* application mode: long-running streaming jobs, large scale &
> heterogenous jobs (resource isolation!), application-centric deployment
> mode (fits the "Kubernetes world")
>
>
> *# Call to Action*
> * Do you use per-job mode? If so, why & would you be able to migrate to
> one of the other methods?
> * Am I missing any pros/cons?
> * Are you in favor of dropping per-job mode midterm?
>
> Cheers and thank you,
>
> Konstantin
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


RE: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-13 Thread Ronak Beejawat (rbeejawa)
HI Martijn,

I posted the below query both the places(flink mailing list and stack overflow) 
to get a quick response on it.
Please let me know the exact poc / mailing list to post my quries if it is 
causing trouble, so at least we can get quick acknowledgement on the issues 
reported.

Ok let me ask the below question in a simpler way

Join 1

select * from cdrTable left join  ccmversionsumapTable cvsm ON 
(cdrTable.version = ccmversionsumapTable.ccmversion) group by 
TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), ...
(2.5 million left join with 23 records it is failing to compute and throwing 
heap error)
Note: This is small join example as compared to Join2 condition as shown below. 
here we are using different connector for reading cdrTable -> kafka connector 
and ccmversionsumapTable -> jdbc connector

Join 2

select * from cdrTable left join  left join cmrTable cmr on (cdr.org_id = 
cmr.org_id AND cdr.cluster_id = cmr.cluster_id AND 
cdr.globalcallid_callmanagerid = cmr.globalcallid_callmanagerid AND 
cdr.globalcallid_callid = cmr.globalcallid_callid AND 
(cdr.origlegcallidentifier = cmr.callidentifier OR cdr.destlegcallidentifier = 
cmr.callidentifier)), ... (2.5 million left join with 5 million it is computing 
properly without any heap error )
Note: This is bigger join example as compared to Join1 condition as shown 
above. here we are using same connector for reading cdrTable , cmrTable -> 
kafka connector

So the question is with small join condition it is throwing heap error and with 
bigger set of join it is working properly . Please help us on this

Thanks
Ronak Beejawat

From: Martijn Visser mailto:mart...@ververica.com>>
Date: Wednesday, 12 January 2022 at 7:43 PM
To: dev mailto:dev@flink.apache.org>>
Cc: commun...@flink.apache.org 
mailto:commun...@flink.apache.org>>, 
u...@flink.apache.org 
mailto:u...@flink.apache.org>>, Hang Ruan 
mailto:ruanhang1...@gmail.com>>, Shrinath Shenoy K 
(sshenoyk) mailto:sshen...@cisco.com>>, Jayaprakash 
Kuravatti (jkuravat) mailto:jkura...@cisco.com>>, Krishna 
Singitam (ksingita) mailto:ksing...@cisco.com>>, Nabhonil 
Sinha (nasinha) mailto:nasi...@cisco.com>>, Vibhor Jain 
(vibhjain) mailto:vibhj...@cisco.com>>, Raghavendra Jsv 
(rjsv) mailto:r...@cisco.com>>, Arun Yadav (aruny) 
mailto:ar...@cisco.com>>, Avi Sanwal (asanwal) 
mailto:asan...@cisco.com>>
Subject: Re: OutOfMemoryError: Java heap space while implmentating flink sql api
Hi Ronak,

I would like to ask you to stop cross-posting to all the Flink mailing lists 
and then also post the same question to Stackoverflow. Both the mailing lists 
and Stackoverflow are designed for asynchronous communication and you should 
allow the community some days to address your question.

Joins are state heavy. As mentioned in the documentation [1] "Thus, the 
required state for computing the query result might grow infinitely depending 
on the number of distinct input rows of all input tables and intermediate join 
results."

Best regards,

Martijn

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/


On Wed, 12 Jan 2022 at 11:06, Ronak Beejawat (rbeejawa) 
mailto:rbeej...@cisco.com.invalid>> wrote:
Hi Team,

I was trying to implement flink sql api join with 2 tables it is throwing error 
OutOfMemoryError: Java heap space . PFB screenshot for flink cluster memory 
details.
[Flink Memory Model][1]


  [1]: https://i.stack.imgur.com/AOnQI.png

**PFB below code snippet which I was trying:**
```
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);


tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
 "TWO_PHASE");
tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
 "true");
tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
 "16");

tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
 + "  suname STRING\r\n"
 + "  ,ccmversion STRING\r\n"
 + "   )\r\n"
 + "   WITH (\r\n"
 + "   'connector' = 'jdbc'\r\n"
 + "   ,'url' = 
'jdbc:mysql://:3306/ccucdb'\r\n"
 + "   ,'table-name' = 
'ccmversionsumap'\r\n"
 + "   ,'username' = 
'*'\r\n"
 + "   ,'password' = ''\r\n"
 + "   )");

tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
   + "   org_id STRING\r\n"
   

RE: Could not find any factory for identifier 'jdbc'

2022-01-13 Thread Ronak Beejawat (rbeejawa)
Hi Roman, Chesnay

PFB screenshot for jdbc connector availability in bundle jar as I mentioned 
earlier it didn't worked even than, so I tried putting it in inside flink lib 
directory as mentioned in below article link then it resolved the issue.



[cid:image001.png@01D80864.522974B0]


[cid:image002.png@01D80864.522974B0]
@Roman - even I tried with flink-connector-jdbc_2.12 it didn't worked .

Thanks
Ronak Beejawat
From: Roman Khachatryan mailto:ro...@apache.org>>
Date: Wednesday, 12 January 2022 at 6:57 PM
To: commun...@flink.apache.org 
mailto:commun...@flink.apache.org>>
Cc: dev mailto:dev@flink.apache.org>>, Ronak Beejawat 
(rbeejawa) mailto:rbeej...@cisco.com.invalid>>, 
u...@flink.apache.org 
mailto:u...@flink.apache.org>>, Hang Ruan 
mailto:ruanhang1...@gmail.com>>, Shrinath Shenoy K 
(sshenoyk) mailto:sshen...@cisco.com>>, Karthikeyan 
Muthusamy (karmuthu) mailto:karmu...@cisco.com>>, Krishna 
Singitam (ksingita) mailto:ksing...@cisco.com>>, Arun Yadav 
(aruny) mailto:ar...@cisco.com>>, Jayaprakash Kuravatti 
(jkuravat) mailto:jkura...@cisco.com>>, Avi Sanwal 
(asanwal) mailto:asan...@cisco.com>>
Subject: Re: Could not find any factory for identifier 'jdbc'
Hi,

I think Chesnay's suggestion to double-check the bundle makes sense.
Additionally, I'd try flink-connector-jdbc_2.12 instead of
flink-connector-jdbc_2.11.

Regards,
Roman

On Wed, Jan 12, 2022 at 12:23 PM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
>
> I would try double-checking whether the jdbc connector was truly bundled
> in your jar, specifically whether
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory is.
>
> I can't think of a reason why this shouldn't work for the JDBC connector.
>
> On 12/01/2022 06:34, Ronak Beejawat (rbeejawa) wrote:
> > Hi Chesnay,
> >
> > How do you ensure that the connector is actually available at runtime?
> >
> > We are providing below mentioned dependency inside pom.xml with scope 
> > compile that will be available in class path and it was there in my fink 
> > job bundled jar. Same we are doing the same for other connector say kafka 
> > it worked for that
> >
> > 
> >org.apache.flink
> >flink-connector-jdbc_2.11
> >1.14.2
> > 
> > 
> >mysql
> >mysql-connector-java
> >5.1.41
> > 
> >
> > Are you bundling it in a jar or putting it into Flinks lib directory?
> > Yes we are building jar it is bundled with that but still we saw this error 
> > . So we tried the workaround which is mentioned in some article to put 
> > inside a flink lib directory then it worked 
> > https://blog.csdn.net/weixin_44056920/article/details/118110949 . So this 
> > is extra stuff which we have to do to make it work with restart of cluster .
> >
> > But the question is how it worked for kafka and not for jdbc ? I didn't put 
> > kafka jar explicitly in flink lib folder
> >
> > Note : I am using flink release 1.14 version for all my job execution / 
> > implementation which is a stable version I guess
> >
> > Thanks
> > Ronak Beejawat
> > From: Chesnay Schepler 
> > mailto:ches...@apache.org>>
> > Date: Tuesday, 11 January 2022 at 7:45 PM
> > To: Ronak Beejawat (rbeejawa) 
> > mailto:rbeej...@cisco.com.INVALID>>,
> >  
> > u...@flink.apache.org>
> >  
> > mailto:u...@flink.apache.org>>
> > Cc: Hang Ruan 
> > mailto:ruanhang1...@gmail.com>>,
> >  Shrinath Shenoy K (sshenoyk) 
> > mailto:sshen...@cisco.com>>,
> >  Karthikeyan Muthusamy (karmuthu) 
> > mailto:karmu...@cisco.com>>,
> >  Krishna Singitam (ksingita) 
> > mailto:ksing...@cisco.com>>,
> >  Arun Yadav (aruny) 
> > mailto:ar...@cisco.com>>,
> >  Jayaprakash Kuravatti (jkuravat) 
> > mailto:jkura...@cisco.com>>,
> >  Avi Sanwal (asanwal) 
> > mailto:asan...@cisco.com>>
> > Subject: Re: Could not find any factory for identifier 'jdbc'
> > How do you ensure that the connector is actually available at runtime?
> > Are you bundling it in a jar or putting it into Flinks lib directory?
> >
> > On 11/01/2022 14:14, Ronak Beejawat (rbeejawa) wrote:
> >> Correcting subject -> Could not find any factory for identifier 'jdbc'
> >>
> >> From: Ronak Beejawat (rbeejawa)
> >> Sent: Tuesday, January 11, 2022 6:43 PM
> >> To: 'dev@flink.apache.org' 
> >> mailt

[DISCUSS] Merge flink-connector-testing into flink-connector-test-utils

2022-01-13 Thread Qingsheng Ren
Hi all,

I’d like to start a discussion about merging two utilization modules 
"flink-connector-testing" and "flink-connector-test-utils". Both modules are 
designed for providing connector testing infrastructures and helper classes, so 
merging these two modules could simplify dependencies of connectors and reduce 
the complexity of Flink project. 

The plan is to move classes in flink-connector-testing to 
flink-connector-test-utils, since the latter one have been existing for a 
longer time in the projects, and flink-connector-testing is in experimental 
status now.

Currently these two modules work as follows: 

"flink-connector-test-utils" provides utilization classes supporting unit tests 
for connector, such as TestingReaderContext and TestingSplitEnumeratorContext, 
and a test suite SourceReaderTestBase with general test cases for source 
readers.

"flink-connector-testing" is the experimental connector testing framework, 
aiming to support connector’s end-to-end tests and provide standard test suites 
with general E2E cases.

Looking forward to your feedback!

Best regards, 

Qingsheng Ren

[DISCUSS] Future of Per-Job Mode

2022-01-13 Thread Konstantin Knauf
Hi everyone,

I would like to discuss and understand if the benefits of having Per-Job
Mode in Apache Flink outweigh its drawbacks.


*# Background: Flink's Deployment Modes*
Flink currently has three deployment modes. They differ in the following
dimensions:
* main() method executed on Jobmanager or Client
* dependencies shipped by client or bundled with all nodes
* number of jobs per cluster & relationship between job and cluster
lifecycle* (supported resource providers)

## Application Mode
* main() method executed on Jobmanager
* dependencies already need to be available on all nodes
* dedicated cluster for all jobs executed from the same main()-method
(Note: applications with more than one job, currently still significant
limitations like missing high-availability). Technically, a session cluster
dedicated to all jobs submitted from the same main() method.
* supported by standalone, native kubernetes, YARN

## Session Mode
* main() method executed in client
* dependencies are distributed from and by the client to all nodes
* cluster is shared by multiple jobs submitted from different clients,
independent lifecycle
* supported by standalone, Native Kubernetes, YARN

## Per-Job Mode
* main() method executed in client
* dependencies are distributed from and by the client to all nodes
* dedicated cluster for a single job
* supported by YARN only


*# Reasons to Keep** There are use cases where you might need the
combination of a single job per cluster, but main() method execution in the
client. This combination is only supported by per-job mode.
* It currently exists. Existing users will need to migrate to either
session or application mode.


*# Reasons to Drop** With Per-Job Mode and Application Mode we have two
modes that for most users probably do the same thing. Specifically, for
those users that don't care where the main() method is executed and want to
submit a single job per cluster. Having two ways to do the same thing is
confusing.
* Per-Job Mode is only supported by YARN anyway. If we keep it, we should
work towards support in Kubernetes and Standalone, too, to reduce special
casing.
* Dropping per-job mode would reduce complexity in the code and allow us to
dedicate more resources to the other two deployment modes.
* I believe with session mode and application mode we have to easily
distinguishable and understandable deployment modes that cover Flink's use
cases:
   * session mode: olap-style, interactive jobs/queries, short lived batch
jobs, very small jobs, traditional cluster-centric deployment mode (fits
the "Hadoop world")
   * application mode: long-running streaming jobs, large scale &
heterogenous jobs (resource isolation!), application-centric deployment
mode (fits the "Kubernetes world")


*# Call to Action*
* Do you use per-job mode? If so, why & would you be able to migrate to one
of the other methods?
* Am I missing any pros/cons?
* Are you in favor of dropping per-job mode midterm?

Cheers and thank you,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Flink native k8s integration vs. operator

2022-01-13 Thread Konstantin Knauf
Hi Thomas,

Yes, I was referring to a separate repository under Apache Flink.

Cheers,

Konstantin

On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:

> Hi everyone,
>
> Thanks for the feedback and discussion. A few additional thoughts:
>
> [Konstantin] > With respect to common lifecycle management operations:
> these features are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> this
> > a shortcoming of the Kubernetes integration.
>
> I think time and evolution of the ecosystem are factors to consider as
> well. The state and usage of Flink was much different when YARN
> integration was novel. Expectations are different today and the
> lifecycle functionality provided by an operator may as well be
> considered essential to support the concept of a Flink application on
> k8s. After few years learning from operator experience outside of
> Flink it might be a good time to fill the gap.
>
> [Konstantin] > I still believe that we should keep this focus on low
> > level composable building blocks (like Jobs and Snapshots) in Apache
> Flink
> > to make it easy for everyone to build fitting higher level abstractions
> > like a FlinkApplication Custom Resource on top of it.
>
> I completely agree that it is important that the basic functions of
> Flink are solid and continued focus is necessary. Thanks for sharing
> the pointers, these are great improvements. At the same time,
> ecosystem, contributor base and user spectrum are growing. There have
> been significant additions in many areas of Flink including connectors
> and higher level abstractions like statefun, SQL and Python. It's also
> evident from additional repositories/subprojects that we have in Flink
> today.
>
> [Konstantin] > Having said this, if others in the community have the
> capacity to push and
> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
> > Flink, I don't see any blockers. If or when this happens, I'd see some
> > clear benefits of using a separate repository (easier independent
> > versioning and releases, different build system & tooling (go, I
> assume)).
>
> Naturally different contributors to the project have different focus.
> Let's find out if there is strong enough interest to take this on and
> strong enough commitment to maintain. As I see it, there is a
> tremendous amount of internal investment going into operationalizing
> Flink within many companies. Improvements to the operational side of
> Flink like the operator would complement Flink nicely. I assume that
> you are referring to a separate repository within Apache Flink, which
> would give it the chance to achieve better sustainability than the
> existing external operator efforts. There is also the fact that some
> organizations which are heavily invested in operationalizing Flink are
> allowing contributing to Apache Flink itself but less so to arbitrary
> github projects. Regarding the tooling, it could well turn out that
> Java is a good alternative given the ecosystem focus and that there is
> an opportunity for reuse in certain aspects (metrics, logging etc.).
>
> [Yang] > I think Xintong has given a strong point why we introduced
> the native K8s integration, which is active resource management.
> > I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> > recovery time based on the K8s eviction time(IIRC, default is 5
> minutes). For the native K8s integration, Flink RM could be aware of the
> > TM heartbeat lost and allocate a new one timely.
>
> Thanks for sharing this, we should evaluate it as part of a proposal.
> If we can optimize recovery or scaling with active resource management
> then perhaps it is worth to support it through the operator.
> Previously mentioned operators all rely on the standalone model.
>
> Cheers,
> Thomas
>
> On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf 
> wrote:
> >
> > cc dev@
> >
> > Hi Thomas, Hi everyone,
> >
> > Thank you for starting this discussion and sorry for chiming in late.
> >
> > I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
> > Integration", in particular, it does actually not integrate well with the
> > Kubernetes ecosystem despite being called "native" (tooling, security
> > concerns).
> >
> > With respect to common lifecycle management operations: these features
> are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> this
> > a shortcoming of the Kubernetes integration. Instead, we have been
> focusing
> > our efforts in Apache Flink on the operations of a single Job, and left
> > orchestration and lifecycle management that spans multiple Jobs to
> > ecosystem projects. I still believe that we should keep this focus on low
> > level composable building blocks (like J

[jira] [Created] (FLINK-25642) The standalone deployment mode should also support the parameter pipeline.jars

2022-01-13 Thread john (Jira)
john created FLINK-25642:


 Summary: The standalone deployment mode should also support the 
parameter pipeline.jars
 Key: FLINK-25642
 URL: https://issues.apache.org/jira/browse/FLINK-25642
 Project: Flink
  Issue Type: Improvement
Reporter: john


The standalone deployment mode should also support the parameter pipeline.jars. 
Although we can put the jar package in the lib directory or usrlib, it is still 
not flexible enough. I think it is necessary to support the parameter 
pipeline.jars. Especially when we deploy jobs on kubernetes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25641) Unexpected aggregate plan after load hive module

2022-01-13 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25641:
--

 Summary: Unexpected aggregate plan after load hive module
 Key: FLINK-25641
 URL: https://issues.apache.org/jira/browse/FLINK-25641
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang
 Attachments: image-2022-01-13-15-52-27-783.png, 
image-2022-01-13-15-55-40-958.png

When using flink batch sql to run hive sql queries, we load hive module to use 
Hive built-in functions.
However some query plan plan are unexpected after loading hive module.
For the following sql,

{code:sql}
load module hive;
use modules hive,core;
set table.sql-dialect=hive;

select
  account_id,
  sum(impression)
from test_db.test_table where dt = '2022-01-10' and hi = '0100' group by 
account_id
{code}
The planner is:

 !image-2022-01-13-15-55-40-958.png! 

After remove 'load mudiles hive; use modules hive, core;', the planner is:

 !image-2022-01-13-15-52-27-783.png! 

After loading hive modules, hash aggregate is not final plan because the 
aggregate buffer is not fixed length which type is as following.
{code:java}
LEGACY('RAW', 
'ANY')
{code}






--
This message was sent by Atlassian Jira
(v8.20.1#820001)