Re: [DISCUSS] FLIP-137: Support Pandas UDAF in PyFlink

2020-09-02 Thread jincheng sun
Thanks for the update Xingbo!

Pandas UDAF can reuse the `class aggregate function (user defined
function)` interface in FLIP-139, and the core logic of Pandas UDAF users
is written in the `accumulate` method. In this way, we can unify the
interface semantics of all UDAF.

What do you think?

Best,
Jincheng



Xingbo Huang  于2020年8月31日周一 下午6:06写道:

> Hi Jincheng,
>
> Thanks a lot for joining the discussion and the suggestion of discussing
> FLIP-137 and FLIP-139 together.
>
> >> 1. We also need to consider how pandas UDAF supports metrics, and
> whether
> we need a custom interface for pandas UDAF?
>
> Yes. We need to add an interface so that users can add some logic in the
> `open` or `close` method such as creating metrics. I have added the
> definition of the interface and the corresponding example in the doc.
>
> >> 2. We have added @udaf(), so whether to use ordinary Python UDAF?
>
> Yes. From the overall view of Python User Defined Function, we use @udf to
> describe general python udf and pandas udf, @udtf to describe python udtf,
> and @udaf to describe general python udaf and pandas udaf, which is more
> unified. I will discuss it in FLIP-139 later.
>
> Best,
> Xingbo
>
> jincheng sun  于2020年8月31日周一 上午11:05写道:
>
> > Hi Xingbo,
> >
> > Thanks for the discussion! Overall, + 1 for this FLIP.
> > I have two points to add:
> >
> >  - We also need to consider how pandas UDAF supports metrics, and whether
> > we need a custom interface for pandas UDAF?
> >  - We have added @udaf(), so whether to use ordinary Python UDAF? If not,
> > the addition of @udaf is not appropriate. We need to discuss it further.
> >
> > We can consider it combination with FLIP-139 for design. What do you
> think?
> >
> > Best,
> > Jincheng
> >
> >
> > Xingbo Huang  于2020年8月24日周一 下午2:25写道:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion thread on "Support Pandas UDAF in
> > > PyFlink"
> > >
> > > Pandas UDF has been supported in FLINK 1.11 (FLIP-97[1]). It solves the
> > > high serialization/deserialization overhead in Python UDF and makes it
> > > convenient to leverage the popular Python libraries such as Pandas,
> > Numpy,
> > > etc. Since Pandas UDF has so many advantages, we want to support Pandas
> > > UDAF to extend usage of Pandas UDF.
> > >
> > > Dian Fu and I have discussed offline and have drafted the FLIP-137[2].
> It
> > > includes the following items:
> > >   - Support Pandas UDAF in Batch Group Aggregation
> > >   - Support Pandas UDAF in Batch Group Window Aggregation
> > >   - Support Pandas UDAF in Batch Over Window Aggregation
> > >   - Support Pandas UDAF in Stream Group Window Aggregation
> > >   - Support Pandas UDAF in Stream Bounded Over Window Aggregation
> > >
> > >
> > > Looking forward to your feedback!
> > >
> > > Best,
> > > Xingbo
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-137%3A+Support+Pandas+UDAF+in+PyFlink
> > >
> >
>


[jira] [Created] (FLINK-19130) Expose backpressure metrics / logs for function dispatcher operator

2020-09-02 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-19130:
---

 Summary: Expose backpressure metrics / logs for function 
dispatcher operator
 Key: FLINK-19130
 URL: https://issues.apache.org/jira/browse/FLINK-19130
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


As of now, there is no visibility on why or how backpressure is applied in 
Stateful Functions.
This JIRA attemps to add two metrics as an initial effort of providing more 
visibility:
- Total number of addresses that have asked to be blocked
- Total number of inflight pending async operations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: FileSystemHaServices and BlobStore

2020-09-02 Thread Yang Wang
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be
built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage.
But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun  于2020年9月2日周三 上午7:36写道:

> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev 
> *Cc:* Alexey Trenikhun ; Flink User Mail List <
> u...@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman  于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > --
> > *From:* Alexey Trenikhun 
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman 
> > *Cc:* Flink User Mail List 
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > --
> > *From:* Khachatryan Roman 
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun 
> > *Cc:* Flink User Mail List 
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > 

RE: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Zhou, Brian
Hi,

Thanks Becket for addressing the issue. FLINK-18641 is now a blocker for 
Pravega connector integration, hope we can have it included in 1.11.2 release.

Best Regards,
Brian

-Original Message-
From: Becket Qin  
Sent: Thursday, September 3, 2020 11:18
To: dev
Cc: khachatryan.ro...@gmail.com; Till Rohrmann; david; Jingsong Li
Subject: Re: [DISCUSS] Releasing Flink 1.11.2


[EXTERNAL EMAIL] 

Hi Zhuzhu,

Thanks for starting the discussion.

I'd like to include FLINK-18641 into 1.11.2 as well. It is a regression from 
previous versions and is currently blocking the development of Pravega source 
on top of FLIP-27.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu  wrote:

> Thank you all for the inputs!
>
> I agree with Till that we should set a soft deadline first.
> I'd like to propose next Monday if no new blocker issue pops up.
> But feel free to raise your concerns if you feel next Monday as a 
> deadline may not work for fixes which should be a blocker for 1.11.2.
>
> Here's a summary of the wanted/blocker but still open fixes:
> - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
> - FLINK-19109 Split Reader eats chained periodic watermarks
> - (not a strict blocker) FLINK-18959 Fail to archiveExecutionGraph 
> because job is not finished when dispatcher close
> - FLINK-18934 Idle stream does not advance watermark in connected 
> stream
>
> Thanks,
> Zhu
>
> Konstantin Knauf  于2020年9月2日周三 下午9:00写道:
>
> > I think it would be nice to include a fix for 
> > https://issues.apache.org/jira/browse/FLINK-18934, too, as it 
> > affects a highly requested feature of Flink 1.11 quite severely.
> >
> > On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann 
> wrote:
> >
> > > Thanks a lot for starting this discussion Zhu Zhu and for 
> > > volunteering
> as
> > > the release manager. Big +1 for creating the next 1.11 bug fix
> release. I
> > > think we already collected quite a bit of fixes which our users 
> > > will benefit from.
> > >
> > > For the pending fixes, I would suggest setting a soft deadline 
> > > (maybe
> > until
> > > beginning of next week) and then starting to cut the release 
> > > (given
> that
> > no
> > > other blocker issues pop up). Maybe we are able to resolve the 
> > > issues
> > even
> > > earlier which would allow us to cut the release also earlier.
> > >
> > > From my side I would like to include FLINK-18959 in the release. 
> > > But it
> > is
> > > not a strict release blocker.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Sep 2, 2020 at 2:40 PM David Anderson 
> > > 
> > > wrote:
> > >
> > > > I think it's worth considering whether we can get this bugfix
> included
> > in
> > > > 1.11.2:
> > > >
> > > > - FLINK-19109 Split Reader eats chained periodic watermarks
> > > >
> > > > There is a PR, but it's still a work in progress. Cc'ing Roman, 
> > > > who
> has
> > > > been working on this.
> > > >
> > > > Regards,
> > > > David
> > > >
> > > >
> > > > On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > It has been about 1 month since we released Flink 1.11.1. It's 
> > > > > not
> > too
> > > > far
> > > > > but
> > > > > we already have more than 80 resolved improvements/bugs in the
> > > > release-1.11
> > > > > branch. Some of them are quite critical. Therefore, I propose 
> > > > > to
> > create
> > > > the
> > > > > next
> > > > > bugfix release 1.11.2 for Flink 1.11.
> > > > >
> > > > > Most noticeable fixes are:
> > > > > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > > > > - FLINK-18902 Cannot serve results of asynchronous REST 
> > > > > operations
> in
> > > > > per-job mode
> > > > > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > > > > - FLINK-18608 CustomizedConvertRule#convertCast drops 
> > > > > nullability
> > > > > - FLINK-18646 Managed memory released check can block RPC 
> > > > > thread
> > > > > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method 
> > > > > incorrectly
> > in
> > > > > JobManagerFlinkMemoryUtils.java
> > > > > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > > > > - FLINK-18595 Deadlock during job shutdown
> > > > > - FLINK-18581 Cannot find GC cleaner with java version 
> > > > > previous
> > > > > jdk8u72(-b01)
> > > > > - FLINK-17075 Add task status reconciliation between TM and JM
> > > > >
> > > > > Furthermore, I think the following blocker issue should be 
> > > > > merged
> > > before
> > > > > 1.11.2 release
> > > > >
> > > > > - FLINK-19121 Avoid accessing HDFS frequently in
> > HiveBulkWriterFactory
> > > > >
> > > > > I would volunteer as the release manager and kick off the 
> > > > > release
> > > > process.
> > > > > What do you think?
> > > > >
> > > > > Please let me know if there are any concerns or any missing 
> > > > > blocker
> > > > issues
> > > > > need to be fixed in 1.11.2.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Konstantin 

Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Becket Qin
Hi Zhuzhu,

Thanks for starting the discussion.

I'd like to include FLINK-18641 into 1.11.2 as well. It is a regression
from previous versions and is currently blocking the development of Pravega
source on top of FLIP-27.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu  wrote:

> Thank you all for the inputs!
>
> I agree with Till that we should set a soft deadline first.
> I'd like to propose next Monday if no new blocker issue pops up.
> But feel free to raise your concerns if you feel next Monday as a deadline
> may not work
> for fixes which should be a blocker for 1.11.2.
>
> Here's a summary of the wanted/blocker but still open fixes:
> - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
> - FLINK-19109 Split Reader eats chained periodic watermarks
> - (not a strict blocker) FLINK-18959 Fail to archiveExecutionGraph because
> job is not finished when dispatcher close
> - FLINK-18934 Idle stream does not advance watermark in connected stream
>
> Thanks,
> Zhu
>
> Konstantin Knauf  于2020年9月2日周三 下午9:00写道:
>
> > I think it would be nice to include a fix for
> > https://issues.apache.org/jira/browse/FLINK-18934, too, as it affects a
> > highly requested feature of Flink 1.11 quite severely.
> >
> > On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann 
> wrote:
> >
> > > Thanks a lot for starting this discussion Zhu Zhu and for volunteering
> as
> > > the release manager. Big +1 for creating the next 1.11 bug fix
> release. I
> > > think we already collected quite a bit of fixes which our users will
> > > benefit from.
> > >
> > > For the pending fixes, I would suggest setting a soft deadline (maybe
> > until
> > > beginning of next week) and then starting to cut the release (given
> that
> > no
> > > other blocker issues pop up). Maybe we are able to resolve the issues
> > even
> > > earlier which would allow us to cut the release also earlier.
> > >
> > > From my side I would like to include FLINK-18959 in the release. But it
> > is
> > > not a strict release blocker.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Sep 2, 2020 at 2:40 PM David Anderson 
> > > wrote:
> > >
> > > > I think it's worth considering whether we can get this bugfix
> included
> > in
> > > > 1.11.2:
> > > >
> > > > - FLINK-19109 Split Reader eats chained periodic watermarks
> > > >
> > > > There is a PR, but it's still a work in progress. Cc'ing Roman, who
> has
> > > > been working on this.
> > > >
> > > > Regards,
> > > > David
> > > >
> > > >
> > > > On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > It has been about 1 month since we released Flink 1.11.1. It's not
> > too
> > > > far
> > > > > but
> > > > > we already have more than 80 resolved improvements/bugs in the
> > > > release-1.11
> > > > > branch. Some of them are quite critical. Therefore, I propose to
> > create
> > > > the
> > > > > next
> > > > > bugfix release 1.11.2 for Flink 1.11.
> > > > >
> > > > > Most noticeable fixes are:
> > > > > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > > > > - FLINK-18902 Cannot serve results of asynchronous REST operations
> in
> > > > > per-job mode
> > > > > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > > > > - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> > > > > - FLINK-18646 Managed memory released check can block RPC thread
> > > > > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly
> > in
> > > > > JobManagerFlinkMemoryUtils.java
> > > > > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > > > > - FLINK-18595 Deadlock during job shutdown
> > > > > - FLINK-18581 Cannot find GC cleaner with java version previous
> > > > > jdk8u72(-b01)
> > > > > - FLINK-17075 Add task status reconciliation between TM and JM
> > > > >
> > > > > Furthermore, I think the following blocker issue should be merged
> > > before
> > > > > 1.11.2 release
> > > > >
> > > > > - FLINK-19121 Avoid accessing HDFS frequently in
> > HiveBulkWriterFactory
> > > > >
> > > > > I would volunteer as the release manager and kick off the release
> > > > process.
> > > > > What do you think?
> > > > >
> > > > > Please let me know if there are any concerns or any missing blocker
> > > > issues
> > > > > need to be fixed in 1.11.2.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Konstantin Knauf
> >
> > https://twitter.com/snntrable
> >
> > https://github.com/knaufk
> >
>


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Zhijiang
Thanks for launching this discussion and volunteering as the release manager. 
+1 on my side and I am willing to provide any help during the release 
procedure, :)


Best,
Zhijiang


--
From:Konstantin Knauf 
Send Time:2020年9月2日(星期三) 23:44
To:dev 
Cc:khachatryan.roman 
Subject:Re: [DISCUSS] Releasing Flink 1.11.2

I think it would be nice to include a fix for
https://issues.apache.org/jira/browse/FLINK-18934, too, as it affects a
highly requested feature of Flink 1.11 quite severely.

On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann  wrote:

> Thanks a lot for starting this discussion Zhu Zhu and for volunteering as
> the release manager. Big +1 for creating the next 1.11 bug fix release. I
> think we already collected quite a bit of fixes which our users will
> benefit from.
>
> For the pending fixes, I would suggest setting a soft deadline (maybe until
> beginning of next week) and then starting to cut the release (given that no
> other blocker issues pop up). Maybe we are able to resolve the issues even
> earlier which would allow us to cut the release also earlier.
>
> From my side I would like to include FLINK-18959 in the release. But it is
> not a strict release blocker.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 2:40 PM David Anderson 
> wrote:
>
> > I think it's worth considering whether we can get this bugfix included in
> > 1.11.2:
> >
> > - FLINK-19109 Split Reader eats chained periodic watermarks
> >
> > There is a PR, but it's still a work in progress. Cc'ing Roman, who has
> > been working on this.
> >
> > Regards,
> > David
> >
> >
> > On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
> >
> > > Hi All,
> > >
> > > It has been about 1 month since we released Flink 1.11.1. It's not too
> > far
> > > but
> > > we already have more than 80 resolved improvements/bugs in the
> > release-1.11
> > > branch. Some of them are quite critical. Therefore, I propose to create
> > the
> > > next
> > > bugfix release 1.11.2 for Flink 1.11.
> > >
> > > Most noticeable fixes are:
> > > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > > - FLINK-18902 Cannot serve results of asynchronous REST operations in
> > > per-job mode
> > > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > > - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> > > - FLINK-18646 Managed memory released check can block RPC thread
> > > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly in
> > > JobManagerFlinkMemoryUtils.java
> > > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > > - FLINK-18595 Deadlock during job shutdown
> > > - FLINK-18581 Cannot find GC cleaner with java version previous
> > > jdk8u72(-b01)
> > > - FLINK-17075 Add task status reconciliation between TM and JM
> > >
> > > Furthermore, I think the following blocker issue should be merged
> before
> > > 1.11.2 release
> > >
> > > - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
> > >
> > > I would volunteer as the release manager and kick off the release
> > process.
> > > What do you think?
> > >
> > > Please let me know if there are any concerns or any missing blocker
> > issues
> > > need to be fixed in 1.11.2.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> >
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk



Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

2020-09-02 Thread Benchao Li
Hi Jingsong,

Thanks for the clarification, and sorry to misunderstand your first
intention.
What I was talking about is indeed another topic, we can leave it to the
future,
and see if there are any other people who have the same scenarios.

Jingsong Li  于2020年9月3日周四 上午10:56写道:

> Thanks Timo for working on FLIP-107.
>
> Agree, I think it is good.
> I'll spend more time to form a FLIP in detail later.
>
> Best,
> Jingsong
>
> On Wed, Sep 2, 2020 at 7:12 PM Timo Walther  wrote:
>
> > Hi Jingsong,
> >
> > I haven't looked at your proposal but I think it make sense to have a
> > separate FLIP for the parititioning topic. I'm currently working on an
> > update to FLIP-107 and would suggest to remove the paritioning topic
> > there. FLIP-107 will only focus on accessing metadata and expressing
> > key/value formats.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 07:39, Jingsong Li wrote:
> > > Thanks Konstantin and Benchao for your response.
> > >
> > > If we need to push forward the implementation, it should be a FLIP.
> > >
> > > My original intention was to unify the partition definitions for
> batches
> > > and streams:
> > >
> > > - What is "PARTITION" on a table? Partitions define the physical
> storage
> > > form of a table. Different partitions should be stored in different
> > places.
> > > There should be good isolation between them. Therefore, in the
> connector
> > > dimension, we can operate on a partition separately.
> > > - For the Kafka table, the partition is a finite integer value,
> depending
> > > on how many partitions Kafka has.
> > > - For the Filesystem table, the partition is a directory structure,
> which
> > > may be a multi-level structure. And it can be any type, because any
> type
> > > can be converted to a character string.
> > >
> > > So, how do we generate a partition value? It can be directly mapped to
> a
> > > field (identity). In addition, the partition value can also be
> generated
> > by
> > > a function (Transform), this is what I want to discuss with you.
> > >
> > > ## To Konstantin:
> > >
> > >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > > defining how data should be partitioning the sink system or does it
> also
> > > make a difference for source tables? My understanding is that it also
> > makes
> > > a difference for source tables (e.g. if the source system supports
> > > partition pruning). I suppose, for source tables Flink does not
> > > check/enforce this, but trusts that the partitioning information is
> > > correct?!
> > >
> > > Yes, also works for source, according to my understanding, partition
> > > pruning is actually a kind of filtering push-down. The source is
> > > responsible for reading data that contains some specific partitions.
> > >
> > >> 2) I suppose it is up to the connector implementation whether/how to
> > > interpret the partition information. How will this work?
> > >
> > > I think it depends on the interface and implementation. For example,
> for
> > > Kafka, for example, partition fields can be defined on DDL, and some
> > > filtering conditions can be done according to partition fields in
> query.
> > > Kafkasource can operate according to these filter conditions.
> > >
> > >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> > key.
> > > FLIP-107 contains a proposal on how to define the key (which fields of
> > the
> > > schema should become part of the key) when writing to Kafka via Flink
> > SQL.
> > > How does this relate to the PARTITION BY clause?
> > >
> > > How to define a partition should have nothing to do with whether the
> > field
> > > it refers to is a key. Users can refer to the fields in the schema.
> > >
> > > ## To Benchao,
> > >
> > > I feel that your words should be talking about ShuffleBy in the
> > calculation
> > > level? That's a good topic, but this discussion is mainly about
> > partitions
> > > in connector storage.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li 
> wrote:
> > >
> > >> Hi Jingsong,
> > >>
> > >> Thanks for bringing up this discussion. I like this idea generally.
> > >> I'd like to add some cases we met in our scenarios.
> > >>
> > >> ## Source Partition By
> > >> There is an use case that users want to do some lookup thing in the
> UDF,
> > >> it's very like the dimension table. It's common for them to cache some
> > >> lookup result
> > >> in their UDF. If there is no 'partition by' from the source, they
> maybe
> > >> need to cache
> > >> more data in each subtask.
> > >> Actually, what they need is `keyBy` from the DataStream world.
> > >> We supported this feature already in our production.
> > >>
> > >> ## Sink Partition By
> > >> Recently we also received some requirements about this feature.
> > >> Users want to add their custom sink, and need the data shuffled before
> > the
> > >> sink.
> > >> They will do some thing for the data of the same partition key.
> > >>
> > >> 

Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

2020-09-02 Thread Jingsong Li
Thanks Timo for working on FLIP-107.

Agree, I think it is good.
I'll spend more time to form a FLIP in detail later.

Best,
Jingsong

On Wed, Sep 2, 2020 at 7:12 PM Timo Walther  wrote:

> Hi Jingsong,
>
> I haven't looked at your proposal but I think it make sense to have a
> separate FLIP for the parititioning topic. I'm currently working on an
> update to FLIP-107 and would suggest to remove the paritioning topic
> there. FLIP-107 will only focus on accessing metadata and expressing
> key/value formats.
>
> What do you think?
>
> Regards,
> Timo
>
> On 01.09.20 07:39, Jingsong Li wrote:
> > Thanks Konstantin and Benchao for your response.
> >
> > If we need to push forward the implementation, it should be a FLIP.
> >
> > My original intention was to unify the partition definitions for batches
> > and streams:
> >
> > - What is "PARTITION" on a table? Partitions define the physical storage
> > form of a table. Different partitions should be stored in different
> places.
> > There should be good isolation between them. Therefore, in the connector
> > dimension, we can operate on a partition separately.
> > - For the Kafka table, the partition is a finite integer value, depending
> > on how many partitions Kafka has.
> > - For the Filesystem table, the partition is a directory structure, which
> > may be a multi-level structure. And it can be any type, because any type
> > can be converted to a character string.
> >
> > So, how do we generate a partition value? It can be directly mapped to a
> > field (identity). In addition, the partition value can also be generated
> by
> > a function (Transform), this is what I want to discuss with you.
> >
> > ## To Konstantin:
> >
> >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > defining how data should be partitioning the sink system or does it also
> > make a difference for source tables? My understanding is that it also
> makes
> > a difference for source tables (e.g. if the source system supports
> > partition pruning). I suppose, for source tables Flink does not
> > check/enforce this, but trusts that the partitioning information is
> > correct?!
> >
> > Yes, also works for source, according to my understanding, partition
> > pruning is actually a kind of filtering push-down. The source is
> > responsible for reading data that contains some specific partitions.
> >
> >> 2) I suppose it is up to the connector implementation whether/how to
> > interpret the partition information. How will this work?
> >
> > I think it depends on the interface and implementation. For example, for
> > Kafka, for example, partition fields can be defined on DDL, and some
> > filtering conditions can be done according to partition fields in query.
> > Kafkasource can operate according to these filter conditions.
> >
> >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> key.
> > FLIP-107 contains a proposal on how to define the key (which fields of
> the
> > schema should become part of the key) when writing to Kafka via Flink
> SQL.
> > How does this relate to the PARTITION BY clause?
> >
> > How to define a partition should have nothing to do with whether the
> field
> > it refers to is a key. Users can refer to the fields in the schema.
> >
> > ## To Benchao,
> >
> > I feel that your words should be talking about ShuffleBy in the
> calculation
> > level? That's a good topic, but this discussion is mainly about
> partitions
> > in connector storage.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li  wrote:
> >
> >> Hi Jingsong,
> >>
> >> Thanks for bringing up this discussion. I like this idea generally.
> >> I'd like to add some cases we met in our scenarios.
> >>
> >> ## Source Partition By
> >> There is an use case that users want to do some lookup thing in the UDF,
> >> it's very like the dimension table. It's common for them to cache some
> >> lookup result
> >> in their UDF. If there is no 'partition by' from the source, they maybe
> >> need to cache
> >> more data in each subtask.
> >> Actually, what they need is `keyBy` from the DataStream world.
> >> We supported this feature already in our production.
> >>
> >> ## Sink Partition By
> >> Recently we also received some requirements about this feature.
> >> Users want to add their custom sink, and need the data shuffled before
> the
> >> sink.
> >> They will do some thing for the data of the same partition key.
> >>
> >> An addition to 'Source Partition By' semantic, actually, it's not enough
> >> for current use cases.
> >> The more common way to do this is to add partition by semantic in
> 'view',
> >> then
> >> users can do the 'keyBy' multiple times in one query.
> >>
> >> I've no strong options about these features, just add some use cases and
> >> would like to hear more options about this.
> >>
> >>
> >> Konstantin Knauf  于2020年8月31日周一 下午7:09写道:
> >>
> >>> Hi Jingsong,
> >>>
> >>> I would like to understand this FLIP (?) a bit better, but I am 

Re: FileSystemHaServices and BlobStore

2020-09-02 Thread Alexey Trenikhun
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC 
messages and log files) only jar files need HA guarantee, and in my particular 
case, job cluster with jar as part of image, it seems doesn't matter, I guess 
it explains why in my test I was able to recover from failure even 
VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang 
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev 
Cc: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the 
distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is 
very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager 
terminated.

However, in your case, benefit from the K8s persistent volume, all the local 
blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please 
also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from 
the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s 
deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled 
up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader 
election/retrieval, jobgraph meta store, checkpoint meta store, running 
registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman 
mailto:khachatryan.ro...@gmail.com>> 于2020年8月31日周一 
下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> --
> *From:* Alexey Trenikhun mailto:yen...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman 
> mailto:khachatryan.ro...@gmail.com>>
> *Cc:* Flink User Mail List 
> mailto:u...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> --
> *From:* Khachatryan Roman 
> mailto:khachatryan.ro...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun mailto:yen...@msn.com>>
> *Cc:* Flink User Mail List 
> mailto:u...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun 
> mailto:yen...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>


[jira] [Created] (FLINK-19129) Helm charts are missing the latest log4j-console file

2020-09-02 Thread Igal Shilman (Jira)
Igal Shilman created FLINK-19129:


 Summary: Helm charts are missing the latest log4j-console file
 Key: FLINK-19129
 URL: https://issues.apache.org/jira/browse/FLINK-19129
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Igal Shilman
 Fix For: statefun-2.2.0


Our Helm charts still contain the old log4j file (from flink-1.10.x) but 
logging was changed in Flink 1.11.x.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19128) Remove the runtime execution configuration in sql-client-defaults.yaml

2020-09-02 Thread Jark Wu (Jira)
Jark Wu created FLINK-19128:
---

 Summary: Remove the runtime execution configuration in 
sql-client-defaults.yaml
 Key: FLINK-19128
 URL: https://issues.apache.org/jira/browse/FLINK-19128
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: Jark Wu


Currently, {{sql-client-defaults.yaml}} sets some runtime execution 
configuration by defualt, e.g. parallelism, restart strategy. This is very 
confusing. Because users are confused why paralleslim is changed in 
{{flink-conf.yaml}} but it doesn't work. 

I think the community had an agreement that SQL CLI shouldn't provide a new set 
of runtime configuration. As a first step, I suggest to remove these 
configurations defined in {{sql-client-defaults.yaml}}, to make the 
configurations in {{flink-conf.yaml}} work by default. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19127) Provide a replacement of StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment

2020-09-02 Thread Timo Walther (Jira)
Timo Walther created FLINK-19127:


 Summary: Provide a replacement of 
StreamExecutionEnvironment.createRemoteEnvironment for TableEnvironment
 Key: FLINK-19127
 URL: https://issues.apache.org/jira/browse/FLINK-19127
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


Connecting to a remote cluster from the unified TableEnvironment is neither 
tested nor documented. Since StreamExecutionEnvironment is not necessary 
anymore, users should be able to do the same in TableEnvironment easily. This 
is in particular useful for interactive sessions that run in an IDE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Zhu Zhu
Thank you all for the inputs!

I agree with Till that we should set a soft deadline first.
I'd like to propose next Monday if no new blocker issue pops up.
But feel free to raise your concerns if you feel next Monday as a deadline
may not work
for fixes which should be a blocker for 1.11.2.

Here's a summary of the wanted/blocker but still open fixes:
- FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
- FLINK-19109 Split Reader eats chained periodic watermarks
- (not a strict blocker) FLINK-18959 Fail to archiveExecutionGraph because
job is not finished when dispatcher close
- FLINK-18934 Idle stream does not advance watermark in connected stream

Thanks,
Zhu

Konstantin Knauf  于2020年9月2日周三 下午9:00写道:

> I think it would be nice to include a fix for
> https://issues.apache.org/jira/browse/FLINK-18934, too, as it affects a
> highly requested feature of Flink 1.11 quite severely.
>
> On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann  wrote:
>
> > Thanks a lot for starting this discussion Zhu Zhu and for volunteering as
> > the release manager. Big +1 for creating the next 1.11 bug fix release. I
> > think we already collected quite a bit of fixes which our users will
> > benefit from.
> >
> > For the pending fixes, I would suggest setting a soft deadline (maybe
> until
> > beginning of next week) and then starting to cut the release (given that
> no
> > other blocker issues pop up). Maybe we are able to resolve the issues
> even
> > earlier which would allow us to cut the release also earlier.
> >
> > From my side I would like to include FLINK-18959 in the release. But it
> is
> > not a strict release blocker.
> >
> > Cheers,
> > Till
> >
> > On Wed, Sep 2, 2020 at 2:40 PM David Anderson 
> > wrote:
> >
> > > I think it's worth considering whether we can get this bugfix included
> in
> > > 1.11.2:
> > >
> > > - FLINK-19109 Split Reader eats chained periodic watermarks
> > >
> > > There is a PR, but it's still a work in progress. Cc'ing Roman, who has
> > > been working on this.
> > >
> > > Regards,
> > > David
> > >
> > >
> > > On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
> > >
> > > > Hi All,
> > > >
> > > > It has been about 1 month since we released Flink 1.11.1. It's not
> too
> > > far
> > > > but
> > > > we already have more than 80 resolved improvements/bugs in the
> > > release-1.11
> > > > branch. Some of them are quite critical. Therefore, I propose to
> create
> > > the
> > > > next
> > > > bugfix release 1.11.2 for Flink 1.11.
> > > >
> > > > Most noticeable fixes are:
> > > > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > > > - FLINK-18902 Cannot serve results of asynchronous REST operations in
> > > > per-job mode
> > > > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > > > - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> > > > - FLINK-18646 Managed memory released check can block RPC thread
> > > > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly
> in
> > > > JobManagerFlinkMemoryUtils.java
> > > > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > > > - FLINK-18595 Deadlock during job shutdown
> > > > - FLINK-18581 Cannot find GC cleaner with java version previous
> > > > jdk8u72(-b01)
> > > > - FLINK-17075 Add task status reconciliation between TM and JM
> > > >
> > > > Furthermore, I think the following blocker issue should be merged
> > before
> > > > 1.11.2 release
> > > >
> > > > - FLINK-19121 Avoid accessing HDFS frequently in
> HiveBulkWriterFactory
> > > >
> > > > I would volunteer as the release manager and kick off the release
> > > process.
> > > > What do you think?
> > > >
> > > > Please let me know if there are any concerns or any missing blocker
> > > issues
> > > > need to be fixed in 1.11.2.
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


[jira] [Created] (FLINK-19126) Failed to run job in yarn-cluster mode due to No Executor found.

2020-09-02 Thread Tang Yan (Jira)
Tang Yan created FLINK-19126:


 Summary: Failed to run job in yarn-cluster mode due to No Executor 
found.
 Key: FLINK-19126
 URL: https://issues.apache.org/jira/browse/FLINK-19126
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Affects Versions: 1.11.1
Reporter: Tang Yan


I've build the flink package successfully, but when I run the below command, it 
failed to submit the jobs.

[yanta@flink-1.11]$ bin/flink run -m yarn-cluster -p 2 -c 
org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar 
 --input hdfs:///user/yanta/aa.txt --output hdfs:///user/yanta/result.txt

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or 
HADOOP_CLASSPATH was set.
 The program 
finished with the following exception:
java.lang.IllegalStateException: No Executor found. Please make sure to export 
the HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For 
more information refer to the "Deployment & Operations" section of the official 
Apache Flink documentation. at 
org.apache.flink.yarn.cli.FallbackYarnSessionCli.isActive(FallbackYarnSessionCli.java:59)
 at 
org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine(CliFrontend.java:1090)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:218) at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) 
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) 
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19125) Avoid memory fragmentation when running flink docker image

2020-09-02 Thread Yun Tang (Jira)
Yun Tang created FLINK-19125:


 Summary: Avoid memory fragmentation when running flink docker image
 Key: FLINK-19125
 URL: https://issues.apache.org/jira/browse/FLINK-19125
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes, Runtime / State Backends
Affects Versions: 1.11.1
Reporter: Yun Tang


This ticket tracks the problem of memory fragmentation when launching default 
Flink docker image.

In FLINK-18712, user reported if he submits job with rocksDB state backend on a 
k8s session cluster again and again once it finished, the memory usage of task 
manager grows continuously until OOM killed. 
 I reproduce this problem with official Flink docker image no matter how we use 
rocksDB (whether to enable managed memory).

I dig into the problem and found this is due to the memory fragmentation caused 
by {{glibc}}, which would not return memory to kernel gracefully (please refer 
to [glibc bugzilla|https://sourceware.org/bugzilla/show_bug.cgi?id=15321] and 
[glibc 
manual|https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc])

I found if limiting MALLOC_ARENA_MAX to 2 could mitigate this problem (please 
refer to 
[choose-for-malloc_arena_max|https://devcenter.heroku.com/articles/tuning-glibc-memory-behavior#what-value-to-choose-for-malloc_arena_max]
 for more details).

And if we choose to use jemalloc to allocate memory via rebuilding another 
docker image, the problem would be gone. 

{code:java}
apt-get -y install libjemalloc-dev

ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so
{code}

Jemalloc intends to [emphasize fragmentation 
avoidance|https://github.com/jemalloc/jemalloc /wiki/Background#intended-use] 
and we might consider to re-factor our Dockerfile to base on jemalloc to avoid 
memory fragmentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Konstantin Knauf
I think it would be nice to include a fix for
https://issues.apache.org/jira/browse/FLINK-18934, too, as it affects a
highly requested feature of Flink 1.11 quite severely.

On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann  wrote:

> Thanks a lot for starting this discussion Zhu Zhu and for volunteering as
> the release manager. Big +1 for creating the next 1.11 bug fix release. I
> think we already collected quite a bit of fixes which our users will
> benefit from.
>
> For the pending fixes, I would suggest setting a soft deadline (maybe until
> beginning of next week) and then starting to cut the release (given that no
> other blocker issues pop up). Maybe we are able to resolve the issues even
> earlier which would allow us to cut the release also earlier.
>
> From my side I would like to include FLINK-18959 in the release. But it is
> not a strict release blocker.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 2:40 PM David Anderson 
> wrote:
>
> > I think it's worth considering whether we can get this bugfix included in
> > 1.11.2:
> >
> > - FLINK-19109 Split Reader eats chained periodic watermarks
> >
> > There is a PR, but it's still a work in progress. Cc'ing Roman, who has
> > been working on this.
> >
> > Regards,
> > David
> >
> >
> > On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
> >
> > > Hi All,
> > >
> > > It has been about 1 month since we released Flink 1.11.1. It's not too
> > far
> > > but
> > > we already have more than 80 resolved improvements/bugs in the
> > release-1.11
> > > branch. Some of them are quite critical. Therefore, I propose to create
> > the
> > > next
> > > bugfix release 1.11.2 for Flink 1.11.
> > >
> > > Most noticeable fixes are:
> > > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > > - FLINK-18902 Cannot serve results of asynchronous REST operations in
> > > per-job mode
> > > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > > - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> > > - FLINK-18646 Managed memory released check can block RPC thread
> > > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly in
> > > JobManagerFlinkMemoryUtils.java
> > > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > > - FLINK-18595 Deadlock during job shutdown
> > > - FLINK-18581 Cannot find GC cleaner with java version previous
> > > jdk8u72(-b01)
> > > - FLINK-17075 Add task status reconciliation between TM and JM
> > >
> > > Furthermore, I think the following blocker issue should be merged
> before
> > > 1.11.2 release
> > >
> > > - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
> > >
> > > I would volunteer as the release manager and kick off the release
> > process.
> > > What do you think?
> > >
> > > Please let me know if there are any concerns or any missing blocker
> > issues
> > > need to be fixed in 1.11.2.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> >
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Till Rohrmann
Thanks a lot for starting this discussion Zhu Zhu and for volunteering as
the release manager. Big +1 for creating the next 1.11 bug fix release. I
think we already collected quite a bit of fixes which our users will
benefit from.

For the pending fixes, I would suggest setting a soft deadline (maybe until
beginning of next week) and then starting to cut the release (given that no
other blocker issues pop up). Maybe we are able to resolve the issues even
earlier which would allow us to cut the release also earlier.

>From my side I would like to include FLINK-18959 in the release. But it is
not a strict release blocker.

Cheers,
Till

On Wed, Sep 2, 2020 at 2:40 PM David Anderson  wrote:

> I think it's worth considering whether we can get this bugfix included in
> 1.11.2:
>
> - FLINK-19109 Split Reader eats chained periodic watermarks
>
> There is a PR, but it's still a work in progress. Cc'ing Roman, who has
> been working on this.
>
> Regards,
> David
>
>
> On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:
>
> > Hi All,
> >
> > It has been about 1 month since we released Flink 1.11.1. It's not too
> far
> > but
> > we already have more than 80 resolved improvements/bugs in the
> release-1.11
> > branch. Some of them are quite critical. Therefore, I propose to create
> the
> > next
> > bugfix release 1.11.2 for Flink 1.11.
> >
> > Most noticeable fixes are:
> > - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> > - FLINK-18902 Cannot serve results of asynchronous REST operations in
> > per-job mode
> > - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> > - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> > - FLINK-18646 Managed memory released check can block RPC thread
> > - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly in
> > JobManagerFlinkMemoryUtils.java
> > - FLINK-18663 RestServerEndpoint may prevent server shutdown
> > - FLINK-18595 Deadlock during job shutdown
> > - FLINK-18581 Cannot find GC cleaner with java version previous
> > jdk8u72(-b01)
> > - FLINK-17075 Add task status reconciliation between TM and JM
> >
> > Furthermore, I think the following blocker issue should be merged before
> > 1.11.2 release
> >
> > - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
> >
> > I would volunteer as the release manager and kick off the release
> process.
> > What do you think?
> >
> > Please let me know if there are any concerns or any missing blocker
> issues
> > need to be fixed in 1.11.2.
> >
> > Thanks,
> > Zhu Zhu
> >
>


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread David Anderson
I think it's worth considering whether we can get this bugfix included in
1.11.2:

- FLINK-19109 Split Reader eats chained periodic watermarks

There is a PR, but it's still a work in progress. Cc'ing Roman, who has
been working on this.

Regards,
David


On Wed, Sep 2, 2020 at 2:19 PM Zhu Zhu  wrote:

> Hi All,
>
> It has been about 1 month since we released Flink 1.11.1. It's not too far
> but
> we already have more than 80 resolved improvements/bugs in the release-1.11
> branch. Some of them are quite critical. Therefore, I propose to create the
> next
> bugfix release 1.11.2 for Flink 1.11.
>
> Most noticeable fixes are:
> - FLINK-18769 MiniBatch doesn't work with FLIP-95 source
> - FLINK-18902 Cannot serve results of asynchronous REST operations in
> per-job mode
> - FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
> - FLINK-18608 CustomizedConvertRule#convertCast drops nullability
> - FLINK-18646 Managed memory released check can block RPC thread
> - FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly in
> JobManagerFlinkMemoryUtils.java
> - FLINK-18663 RestServerEndpoint may prevent server shutdown
> - FLINK-18595 Deadlock during job shutdown
> - FLINK-18581 Cannot find GC cleaner with java version previous
> jdk8u72(-b01)
> - FLINK-17075 Add task status reconciliation between TM and JM
>
> Furthermore, I think the following blocker issue should be merged before
> 1.11.2 release
>
> - FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory
>
> I would volunteer as the release manager and kick off the release process.
> What do you think?
>
> Please let me know if there are any concerns or any missing blocker issues
> need to be fixed in 1.11.2.
>
> Thanks,
> Zhu Zhu
>


[DISCUSS] Releasing Flink 1.11.2

2020-09-02 Thread Zhu Zhu
Hi All,

It has been about 1 month since we released Flink 1.11.1. It's not too far
but
we already have more than 80 resolved improvements/bugs in the release-1.11
branch. Some of them are quite critical. Therefore, I propose to create the
next
bugfix release 1.11.2 for Flink 1.11.

Most noticeable fixes are:
- FLINK-18769 MiniBatch doesn't work with FLIP-95 source
- FLINK-18902 Cannot serve results of asynchronous REST operations in
per-job mode
- FLINK-18682 Vector orc reader cannot read Hive 2.0.0 table
- FLINK-18608 CustomizedConvertRule#convertCast drops nullability
- FLINK-18646 Managed memory released check can block RPC thread
- FLINK-18993 Invoke sanityCheckTotalFlinkMemory method incorrectly in
JobManagerFlinkMemoryUtils.java
- FLINK-18663 RestServerEndpoint may prevent server shutdown
- FLINK-18595 Deadlock during job shutdown
- FLINK-18581 Cannot find GC cleaner with java version previous
jdk8u72(-b01)
- FLINK-17075 Add task status reconciliation between TM and JM

Furthermore, I think the following blocker issue should be merged before
1.11.2 release

- FLINK-19121 Avoid accessing HDFS frequently in HiveBulkWriterFactory

I would volunteer as the release manager and kick off the release process.
What do you think?

Please let me know if there are any concerns or any missing blocker issues
need to be fixed in 1.11.2.

Thanks,
Zhu Zhu


[VOTE] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-09-02 Thread Aljoscha Krettek

Hi all,

After the discussion in [1], I would like to open a voting thread for 
FLIP-131 (https://s.apache.org/FLIP-131) [2] which discusses the 
deprecation of the DataSet API and future work on the DataStream API and 
Table API for bounded (batch) execution.


The vote will be open until September 7 (72h + weekend), unless there is 
an objection or not enough votes.


Regards,
Aljoscha

[1] 
https://lists.apache.org/thread.html/r4f24c4312cef7270a1349c39b89fb1184c84065944b43aedf9cfba6a%40%3Cdev.flink.apache.org%3E
[2] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741


Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

2020-09-02 Thread Timo Walther

Hi Jingsong,

I haven't looked at your proposal but I think it make sense to have a 
separate FLIP for the parititioning topic. I'm currently working on an 
update to FLIP-107 and would suggest to remove the paritioning topic 
there. FLIP-107 will only focus on accessing metadata and expressing 
key/value formats.


What do you think?

Regards,
Timo

On 01.09.20 07:39, Jingsong Li wrote:

Thanks Konstantin and Benchao for your response.

If we need to push forward the implementation, it should be a FLIP.

My original intention was to unify the partition definitions for batches
and streams:

- What is "PARTITION" on a table? Partitions define the physical storage
form of a table. Different partitions should be stored in different places.
There should be good isolation between them. Therefore, in the connector
dimension, we can operate on a partition separately.
- For the Kafka table, the partition is a finite integer value, depending
on how many partitions Kafka has.
- For the Filesystem table, the partition is a directory structure, which
may be a multi-level structure. And it can be any type, because any type
can be converted to a character string.

So, how do we generate a partition value? It can be directly mapped to a
field (identity). In addition, the partition value can also be generated by
a function (Transform), this is what I want to discuss with you.

## To Konstantin:


1) Does the PARTITION BY clause only have an effect for sink tables

defining how data should be partitioning the sink system or does it also
make a difference for source tables? My understanding is that it also makes
a difference for source tables (e.g. if the source system supports
partition pruning). I suppose, for source tables Flink does not
check/enforce this, but trusts that the partitioning information is
correct?!

Yes, also works for source, according to my understanding, partition
pruning is actually a kind of filtering push-down. The source is
responsible for reading data that contains some specific partitions.


2) I suppose it is up to the connector implementation whether/how to

interpret the partition information. How will this work?

I think it depends on the interface and implementation. For example, for
Kafka, for example, partition fields can be defined on DDL, and some
filtering conditions can be done according to partition fields in query.
Kafkasource can operate according to these filter conditions.


3) For Kafka, I suppose, the most common partitioning strategy is by key.

FLIP-107 contains a proposal on how to define the key (which fields of the
schema should become part of the key) when writing to Kafka via Flink SQL.
How does this relate to the PARTITION BY clause?

How to define a partition should have nothing to do with whether the field
it refers to is a key. Users can refer to the fields in the schema.

## To Benchao,

I feel that your words should be talking about ShuffleBy in the calculation
level? That's a good topic, but this discussion is mainly about partitions
in connector storage.

Best,
Jingsong

On Tue, Sep 1, 2020 at 10:51 AM Benchao Li  wrote:


Hi Jingsong,

Thanks for bringing up this discussion. I like this idea generally.
I'd like to add some cases we met in our scenarios.

## Source Partition By
There is an use case that users want to do some lookup thing in the UDF,
it's very like the dimension table. It's common for them to cache some
lookup result
in their UDF. If there is no 'partition by' from the source, they maybe
need to cache
more data in each subtask.
Actually, what they need is `keyBy` from the DataStream world.
We supported this feature already in our production.

## Sink Partition By
Recently we also received some requirements about this feature.
Users want to add their custom sink, and need the data shuffled before the
sink.
They will do some thing for the data of the same partition key.

An addition to 'Source Partition By' semantic, actually, it's not enough
for current use cases.
The more common way to do this is to add partition by semantic in 'view',
then
users can do the 'keyBy' multiple times in one query.

I've no strong options about these features, just add some use cases and
would like to hear more options about this.


Konstantin Knauf  于2020年8月31日周一 下午7:09写道:


Hi Jingsong,

I would like to understand this FLIP (?) a bit better, but I am missing
some background, I believe. So, some basic questions:

1) Does the PARTITION BY clause only have an effect for sink tables
defining how data should be partitioning the sink system or does it also
make a difference for source tables? My understanding is that it also

makes

a difference for source tables (e.g. if the source system
supports partition pruning). I suppose, for source tables Flink does not
check/enforce this, but trusts that the partitioning information is
correct?!

2) I suppose it is up to the connector implementation whether/how to
interpret the partition information. How will this work?

3) For 

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-02 Thread Dawid Wysakowicz
Hi all,

A comment from my side on the topic of the current, weird
renaming/naming/reordering when registering a DataStream. It might be
just me, but I find it extremely confusing and I would be really, really
happy if we could simplify it.

I really don't like that the actual behaviour of this method depends on
the input type and set of used operations.

See some examples:

    public static class TestPojo {
        public int a;
        public String b;
        public long c;
    }

        DataStreamSource ds = env.fromElements(new TestPojo());
        Table table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c")); // reordering of the fields
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c").as("d")); // reordering with renaming
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("c")); //
projecting out the 1st field
        table.printSchema();

        DataStreamSource> ds1 =
env.fromElements(Tuple3.of(1, "a", 1L));
        table = tableEnv.fromDataStream(ds1, $("b"), $("a"), $("c")); //
RENAMING without reordering!!! even though exact same arguments as in
the 1st example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("c")); //
projecting out the 3rd field, even though exact same arguments as in the
3rd example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("a"),
$("c").as("d")); // illegal renaming, exception is thrown
        table.printSchema();

Why can't we use established operations such as e.g. projections that
always behave the same and field reference is always a field reference
(in current solution it is either reference or alias), as described in
the FLIP?

If it is such a must to be able to rename the fields without their
original names (I agree it is useful for tuples), I would be very much
prefer to see:

tableEnv.fromDataStream(ds, "b", "a", "c"); <- always rename based on
the index and then you can further apply projections.

Again, maybe I am the only one that find it extremely confusing.

Best,

Dawid

On 02/09/2020 11:47, Jark Wu wrote:
> Hi Timo,
>
> 1. "fromDataStream VS fromInsertStream"
> In terms of naming, personally, I prefer `fromDataStream`,
> `fromChangelogStream`, `toDataStream`, `toChangelogStream` than
> `fromInsertStream`, `toInsertStream`.
>
> 2.  "fromDataStream(DataStream, Expression...) VS
> fromInsertStream(DataStream).select()"
> "fromDataStream" supports reference input fields by position, and fields
> are simply renamed.
> I think this is handy, however it is not supported in
> "fromInsertStream(DataStream).select()".
> Is it possible to keep using `fromDataStream(DataStream, Expression...)`
> but deprecate the support of `.rowtime()` and `.proctime()`.
> Instead, users should call `system_rowtime()` and `system_proctime()` if
> they want to derive the time attribute, e.g.
>
> DataStream> stream = ...
> Table table = tableEnv.fromDataStream(stream,
>$("a"), // rename the first field to 'a'
>$("b"), // rename the second field to 'b'
>system_rowtime().as("rowtime"), // extract the internally attached
> timestamp into an event-time
>system_proctime().as("proctime"));
>
> I think this will be more inline fluent, easy to validate, and make it
> possible to use the existing API. What do you think?
>
> 3. "name-based setters should always be based on fieldNames"
> +1 to have constant fieldName->index mapping. It will be more
> straightforward and avoid confusing.
> We can still introduce the dynamic field index mapping in the future if
> needed.
>
> Best,
> Jark
>
> On Wed, 2 Sep 2020 at 16:19, Timo Walther  wrote:
>
>> Hi everyone
>>
>> thanks for your feedback. It's a lot of content that needs to be
>> digested. I will update the FLIP shortly to incorporate some of the
>> feedback already. But let me respond to some topics first:
>>
>> "not deprecate these API", "the API of the table layer is changing too
>> fast"
>>
>> I agree that deprecating API is definitely not great for users, but in
>> this cases I think it is for the greater good it makes the API more
>> understandable and focuses on common use cases for the future. I would
>> rather say that the API is about to settle because there only a couple
>> of shortcomings left and the bigger picture is clearer than ever. IMO
>> The proposed changes are one of the last bigger API changes on the
>> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
>> methods just because we changed so much in the last releases should not
>> be a reason to keep confusing API. Users are happy to upgrade if they
>> also get more features by upgrading (e.g. fromChangelogStream).
>>
>> 1. "fromDataStream VS fromInsertStream"
>>
>> The main reason to change this API is to have the possibility to update
>> the type mapping without breaking backwards compatibility. The name
>> `fromInsertStream` makes it possible to have new semantics and makes

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-02 Thread Danny Chan
Thanks Timo ~

“No this is not possible, because T records have no changeflag. Without a
changeflag, a ChangelogMode makes not much sense. “

I agree, but just distinguish the different ChangelogMode with a renamed API 
still does not resolve the problem either,
an API change compared to an additional parameter, i would choose the later.

“However, with
a schema you cannot specify the data type of the top-level record
itself”

What is a “top-level record “ ? Do you mean the physical type of the record ? 
From a Schema we can infer its original type though.

“Is it possible to keep using `fromDataStream(DataStream, Expression…)`”

From the SQL side, an Expression list usually means a computation (projection) 
there, while here we actually want to define the schema of the stream(which is 
static). Compared to "fromInsertStream(DataStream).select()”, they actually 
indicate the same thing from the API level, although I would not vote 
`fromDataStream(DataStream, Expression…)` it is still better than 
`fromInsertStream(DataStream).select()`.

Best,
Danny Chan
在 2020年9月2日 +0800 PM4:19,Timo Walther ,写道:
> Hi everyone
>
> thanks for your feedback. It's a lot of content that needs to be
> digested. I will update the FLIP shortly to incorporate some of the
> feedback already. But let me respond to some topics first:
>
> "not deprecate these API", "the API of the table layer is changing too fast"
>
> I agree that deprecating API is definitely not great for users, but in
> this cases I think it is for the greater good it makes the API more
> understandable and focuses on common use cases for the future. I would
> rather say that the API is about to settle because there only a couple
> of shortcomings left and the bigger picture is clearer than ever. IMO
> The proposed changes are one of the last bigger API changes on the
> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
> methods just because we changed so much in the last releases should not
> be a reason to keep confusing API. Users are happy to upgrade if they
> also get more features by upgrading (e.g. fromChangelogStream).
>
> 1. "fromDataStream VS fromInsertStream"
>
> The main reason to change this API is to have the possibility to update
> the type mapping without breaking backwards compatibility. The name
> `fromInsertStream` makes it possible to have new semantics and makes
> concepts more explicit by naming.
>
> 2. "toAppendStream VS toInsertStream"
>
> "Append" is common in the Flink community but the outside world uses
> "insert". Actually, the term "append-only table" is wrong because SQL
> tables have bag semantics without any order. So "appending" is more of
> an "insertion". This is also represented in FLIP-95's `RowKind` where we
> call the concepts INSERT and `ChangelogKind.insertOnly`.
>
> 3. "`.rowtime()` and `.proctime()`"
>
> "API is also widely used, even in our test code"
>
> The test code is already quite outdated and uses a lot of deprecated
> API. We need to deal with that with a better testing infrastructure. But
> this can be future work.
>
> "users have already accepted it"
>
> I'm not sure if users have already accepted it. I think we get at least
> one question around this topic every week because users would like to
> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
> And specifying all fields just to give the StreamRecord timestamp a name
> should be made easier. This is necessary in 80% of all use cases.
>
> 4. "toAppendStream(Table table, Class/TypeInformation)"
>
> The DataType system is way easier than the TypeInformation system
> because it provides a consistent look and feel with a lot of utilities.
> E.g. many users didn't know that they can just pass `Row.class` in the
> past. Actually extracting types from a `Row.class` is not supported by
> the TypeExtractor (we recently even printed a warning to the logs) but
> we hacked some logic into the method. With AbstractDataType, users can
> still use classes via `DataTypes.of`; for example
> `toInsertStream(DataTypes.of(MyPojo.class))`.
>
> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>
> Similar to `TableEnvironment.execute()` we did some mistakes during the
> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
> we introduced it too quickly. In general this method is correct, but now
> we cannot change the underlying semantics again without breaking
> existing pipelines. We could keep this method and just change the type
> system under the hood, in most of the cases the pipeline should still
> work but we cannot guarantee this due to slight differences.
>
> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream,
> ChangelogMode)"
>
> No this is not possible, because T records have no changeflag. Without a
> changeflag, a ChangelogMode makes not much sense. That's why
> `from/toChangelogStream` supports only `Row` whereas the
> `from/toInsertStream` accepts arbitrary type classes.
>
> 

Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-02 Thread Xintong Song
Thanks for the input, Yu.

I believe the current proposal should work with RocksDB, or any other state
backend, using memory at either the slot or the scope. With the proposed
approach, all we need is an indicator (e.g., a configuration option)
telling us which scope should we calculate the fractions for.

Thank you~

Xintong Song



On Wed, Sep 2, 2020 at 4:53 PM Yu Li  wrote:

> Thanks for compiling the FLIP Xintong, and +1 for the updated doc.
>
> Just one supplement for the RocksDB state backend part:
>
> It's true that currently we're using managed memory at the slot scope.
> However, IMHO, we may support setting weights for different stateful
> operators (for advanced usage) in future. For example, users may choose to
> set higher weights for join operator over aggregation operator, to give
> more memory to those with bigger states. In this case, we may also use
> managed memory at the operator scope for state backends. And if I
> understand correctly, the current design could cover this case well.
>
> Best Regards,
> Yu
>
>
> On Wed, 2 Sep 2020 at 15:39, Xintong Song  wrote:
>
> > Thanks all for the feedback and discussion.
> >
> > I have updated the FLIP, with the following changes.
> >
> >- Choose the main proposal over the alternative approach
> >- Combine weights of RocksDB and batch operators
> >- Expose weights through configuration options, rather than via
> >ExecutionConfig.
> >- Add implementation plan.
> >
> > Please help take another look.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Sep 2, 2020 at 2:41 PM Xintong Song 
> wrote:
> >
> > > Thanks for the inputs, Aljoscha & Till.
> > >
> > >
> > > # Weight Configuration
> > >
> > >
> > > I think exposing the knobs incrementally is a good idea. However, I'm
> not
> > > sure about non-configurable as the first step.
> > >
> > >
> > > Currently, users can tune memory for rocksdb
> > > ('taskmanager.memory.managed.size') and python
> > > ('python.fn-execution.[framework|buffer].memory.size') separately,
> which
> > > practically means any combination of rocksdb and python memory sizes.
> If
> > we
> > > switch to non-configurable weights, that will be a regression compared
> to
> > > 1.11.
> > >
> > >
> > > Therefore, I think exposing via configuration options might be a good
> > > first step. And we can discuss exposing via ExecutionConfig if later we
> > see
> > > that requirement.
> > >
> > >
> > > # Naming of Weights
> > >
> > >
> > > I'm neutral for "Flink/Internal memory".
> > >
> > >
> > > I think the reason we can combine weights for batch algorithms and
> state
> > > backends is that they are never mixed together. My only concern
> > > for "Flink/Internal memory", which might not be a problem at the
> moment,
> > is
> > > that what if new memory use cases appear in the future, which can also
> be
> > > described by "Flink/Internal memory" but is not guaranteed not mixed
> with
> > > batch algorithms or state backends?
> > >
> > >
> > > Anyway, I think the naming should not block this FLIP, as long as we
> have
> > > consensus on combining the two weights for rocksdb and batch
> algorithms.
> > We
> > > can keep the naming discussion open until the implementation phase.
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Sep 1, 2020 at 10:19 PM Till Rohrmann 
> > > wrote:
> > >
> > >> Thanks for creating this FLIP Xintong.
> > >>
> > >> I agree with the previous comments that the memory configuration
> should
> > be
> > >> as easy as possible. Every new knob has the potential to confuse users
> > >> and/or allows him to shoot himself in the foot. Consequently, I am +1
> > for
> > >> the first proposal in the FLIP since it is simpler.
> > >>
> > >> Also +1 for Stephan's proposal to combine batch operator's and
> > >> RocksDB's memory usage into one weight.
> > >>
> > >> Concerning the names for the two weights, I fear that we are facing
> one
> > of
> > >> the two hard things in computer science. To add another idea, we could
> > >> name
> > >> them "Flink memory"/"Internal memory" and "Python memory".
> > >>
> > >> For the sake of making the scope of the FLIP as small as possible and
> to
> > >> develop the feature incrementally, I think that Aljoscha's proposal to
> > >> make
> > >> it non-configurable for the first step sounds like a good idea. As a
> > next
> > >> step (and also if we see need), we can make the memory weights
> > >> configurable
> > >> via the configuration. And last, we could expose it via the
> > >> ExecutionConfig
> > >> if it is required.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Sep 1, 2020 at 2:24 PM Aljoscha Krettek 
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > playing devils advocate here: should we even make the memory weights
> > >> > configurable? We could go with weights that should make sense for
> most
> > >> > cases in the first version and only introduce configurable weights
> > when
> > >> > (if) users need 

[jira] [Created] (FLINK-19123) TestStreamEnvironment does not use shared MiniCluster for executeAsync()

2020-09-02 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-19123:


 Summary: TestStreamEnvironment does not use shared MiniCluster for 
executeAsync()
 Key: FLINK-19123
 URL: https://issues.apache.org/jira/browse/FLINK-19123
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Runtime / Coordination, Tests
Reporter: Aljoscha Krettek


TestStreamEnvironment does override {{execute()}} but not {{executeAsync()}} . 
Now, {{execute()}} goes against the {{MiniCluster}} session that was started by 
a {{MiniClusterWithClientResource}} or some other method that uses 
{{TestStreamEnvironment}}. However, {{executeAsync()}} will go through the 
normal {{StreamExecutionEnvironment}} logic and tries to find an executor, does 
not know that it is a testing environment.

Up until recently, you would have gotten an exception that tells you that no 
executor is configured, then we would have found out that we need to override 
{{executeAsync()}} in {{TestStreamEnvironment}}. However, we currently 
configure a local executor in the constructor: 
[https://github.com/apache/flink/blob/2160c3294ef87143ab9a4e8138cb618651499792/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java#L59].
 With this, you basically get the “local environment” behaviour when you call 
{{executeAsync()}}, which starts a cluster for the job and shuts it down when 
the job finishes. This basically makes the {{TestStreamEnvironment}} cluster 
sharing useless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-02 Thread Jark Wu
Hi Timo,

1. "fromDataStream VS fromInsertStream"
In terms of naming, personally, I prefer `fromDataStream`,
`fromChangelogStream`, `toDataStream`, `toChangelogStream` than
`fromInsertStream`, `toInsertStream`.

2.  "fromDataStream(DataStream, Expression...) VS
fromInsertStream(DataStream).select()"
"fromDataStream" supports reference input fields by position, and fields
are simply renamed.
I think this is handy, however it is not supported in
"fromInsertStream(DataStream).select()".
Is it possible to keep using `fromDataStream(DataStream, Expression...)`
but deprecate the support of `.rowtime()` and `.proctime()`.
Instead, users should call `system_rowtime()` and `system_proctime()` if
they want to derive the time attribute, e.g.

DataStream> stream = ...
Table table = tableEnv.fromDataStream(stream,
   $("a"), // rename the first field to 'a'
   $("b"), // rename the second field to 'b'
   system_rowtime().as("rowtime"), // extract the internally attached
timestamp into an event-time
   system_proctime().as("proctime"));

I think this will be more inline fluent, easy to validate, and make it
possible to use the existing API. What do you think?

3. "name-based setters should always be based on fieldNames"
+1 to have constant fieldName->index mapping. It will be more
straightforward and avoid confusing.
We can still introduce the dynamic field index mapping in the future if
needed.

Best,
Jark

On Wed, 2 Sep 2020 at 16:19, Timo Walther  wrote:

> Hi everyone
>
> thanks for your feedback. It's a lot of content that needs to be
> digested. I will update the FLIP shortly to incorporate some of the
> feedback already. But let me respond to some topics first:
>
> "not deprecate these API", "the API of the table layer is changing too
> fast"
>
> I agree that deprecating API is definitely not great for users, but in
> this cases I think it is for the greater good it makes the API more
> understandable and focuses on common use cases for the future. I would
> rather say that the API is about to settle because there only a couple
> of shortcomings left and the bigger picture is clearer than ever. IMO
> The proposed changes are one of the last bigger API changes on the
> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
> methods just because we changed so much in the last releases should not
> be a reason to keep confusing API. Users are happy to upgrade if they
> also get more features by upgrading (e.g. fromChangelogStream).
>
> 1. "fromDataStream VS fromInsertStream"
>
> The main reason to change this API is to have the possibility to update
> the type mapping without breaking backwards compatibility. The name
> `fromInsertStream` makes it possible to have new semantics and makes
> concepts more explicit by naming.
>
> 2. "toAppendStream VS toInsertStream"
>
> "Append" is common in the Flink community but the outside world uses
> "insert". Actually, the term "append-only table" is wrong because SQL
> tables have bag semantics without any order. So "appending" is more of
> an "insertion". This is also represented in FLIP-95's `RowKind` where we
> call the concepts INSERT and `ChangelogKind.insertOnly`.
>
> 3. "`.rowtime()` and `.proctime()`"
>
> "API is also widely used, even in our test code"
>
> The test code is already quite outdated and uses a lot of deprecated
> API. We need to deal with that with a better testing infrastructure. But
> this can be future work.
>
> "users have already accepted it"
>
> I'm not sure if users have already accepted it. I think we get at least
> one question around this topic every week because users would like to
> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
> And specifying all fields just to give the StreamRecord timestamp a name
> should be made easier. This is necessary in 80% of all use cases.
>
> 4. "toAppendStream(Table table, Class/TypeInformation)"
>
> The DataType system is way easier than the TypeInformation system
> because it provides a consistent look and feel with a lot of utilities.
> E.g. many users didn't know that they can just pass `Row.class` in the
> past. Actually extracting types from a `Row.class` is not supported by
> the TypeExtractor (we recently even printed a warning to the logs) but
> we hacked some logic into the method. With AbstractDataType, users can
> still use classes via `DataTypes.of`; for example
> `toInsertStream(DataTypes.of(MyPojo.class))`.
>
> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>
> Similar to `TableEnvironment.execute()` we did some mistakes during the
> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
> we introduced it too quickly. In general this method is correct, but now
> we cannot change the underlying semantics again without breaking
> existing pipelines. We could keep this method and just change the type
> system under the hood, in most of the cases the pipeline should still
> work but we cannot guarantee this due to slight 

Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-02 Thread Yu Li
Thanks for compiling the FLIP Xintong, and +1 for the updated doc.

Just one supplement for the RocksDB state backend part:

It's true that currently we're using managed memory at the slot scope.
However, IMHO, we may support setting weights for different stateful
operators (for advanced usage) in future. For example, users may choose to
set higher weights for join operator over aggregation operator, to give
more memory to those with bigger states. In this case, we may also use
managed memory at the operator scope for state backends. And if I
understand correctly, the current design could cover this case well.

Best Regards,
Yu


On Wed, 2 Sep 2020 at 15:39, Xintong Song  wrote:

> Thanks all for the feedback and discussion.
>
> I have updated the FLIP, with the following changes.
>
>- Choose the main proposal over the alternative approach
>- Combine weights of RocksDB and batch operators
>- Expose weights through configuration options, rather than via
>ExecutionConfig.
>- Add implementation plan.
>
> Please help take another look.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Sep 2, 2020 at 2:41 PM Xintong Song  wrote:
>
> > Thanks for the inputs, Aljoscha & Till.
> >
> >
> > # Weight Configuration
> >
> >
> > I think exposing the knobs incrementally is a good idea. However, I'm not
> > sure about non-configurable as the first step.
> >
> >
> > Currently, users can tune memory for rocksdb
> > ('taskmanager.memory.managed.size') and python
> > ('python.fn-execution.[framework|buffer].memory.size') separately, which
> > practically means any combination of rocksdb and python memory sizes. If
> we
> > switch to non-configurable weights, that will be a regression compared to
> > 1.11.
> >
> >
> > Therefore, I think exposing via configuration options might be a good
> > first step. And we can discuss exposing via ExecutionConfig if later we
> see
> > that requirement.
> >
> >
> > # Naming of Weights
> >
> >
> > I'm neutral for "Flink/Internal memory".
> >
> >
> > I think the reason we can combine weights for batch algorithms and state
> > backends is that they are never mixed together. My only concern
> > for "Flink/Internal memory", which might not be a problem at the moment,
> is
> > that what if new memory use cases appear in the future, which can also be
> > described by "Flink/Internal memory" but is not guaranteed not mixed with
> > batch algorithms or state backends?
> >
> >
> > Anyway, I think the naming should not block this FLIP, as long as we have
> > consensus on combining the two weights for rocksdb and batch algorithms.
> We
> > can keep the naming discussion open until the implementation phase.
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Sep 1, 2020 at 10:19 PM Till Rohrmann 
> > wrote:
> >
> >> Thanks for creating this FLIP Xintong.
> >>
> >> I agree with the previous comments that the memory configuration should
> be
> >> as easy as possible. Every new knob has the potential to confuse users
> >> and/or allows him to shoot himself in the foot. Consequently, I am +1
> for
> >> the first proposal in the FLIP since it is simpler.
> >>
> >> Also +1 for Stephan's proposal to combine batch operator's and
> >> RocksDB's memory usage into one weight.
> >>
> >> Concerning the names for the two weights, I fear that we are facing one
> of
> >> the two hard things in computer science. To add another idea, we could
> >> name
> >> them "Flink memory"/"Internal memory" and "Python memory".
> >>
> >> For the sake of making the scope of the FLIP as small as possible and to
> >> develop the feature incrementally, I think that Aljoscha's proposal to
> >> make
> >> it non-configurable for the first step sounds like a good idea. As a
> next
> >> step (and also if we see need), we can make the memory weights
> >> configurable
> >> via the configuration. And last, we could expose it via the
> >> ExecutionConfig
> >> if it is required.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Sep 1, 2020 at 2:24 PM Aljoscha Krettek 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > playing devils advocate here: should we even make the memory weights
> >> > configurable? We could go with weights that should make sense for most
> >> > cases in the first version and only introduce configurable weights
> when
> >> > (if) users need them.
> >> >
> >> > Regarding where/how things are configured, I think that most things
> >> > should be a ConfigOption first (Thanks cc'in me, Stephan!). This makes
> >> > them configurable via flink-conf.yaml and via command line parameters,
> >> > for example "bin/flink run -D memory.foo=bla ...". We can think about
> >> > offering programmatic API for cases where it makes sense, of course.
> >> >
> >> > Regarding naming one of the configurable weights
> >> > "StateBackend-BatchAlgorithm". I think it's not a good idea to be that
> >> > specific because the option will not age well. For example when we
> want
> >> > to change which group of memory consumers are 

[jira] [Created] (FLINK-19122) Prometheus scrape generates huge scrape target.

2020-09-02 Thread Harold Dost III (Jira)
Harold Dost III created FLINK-19122:
---

 Summary: Prometheus scrape generates huge scrape target.
 Key: FLINK-19122
 URL: https://issues.apache.org/jira/browse/FLINK-19122
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Harold Dost III


Based on the number sheer size of the labels we are regularly seeing output of 
26M from the it seems mostly due to the "task_name"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-02 Thread Timo Walther

Hi everyone

thanks for your feedback. It's a lot of content that needs to be 
digested. I will update the FLIP shortly to incorporate some of the 
feedback already. But let me respond to some topics first:


"not deprecate these API", "the API of the table layer is changing too fast"

I agree that deprecating API is definitely not great for users, but in 
this cases I think it is for the greater good it makes the API more 
understandable and focuses on common use cases for the future. I would 
rather say that the API is about to settle because there only a couple 
of shortcomings left and the bigger picture is clearer than ever. IMO 
The proposed changes are one of the last bigger API changes on the 
roadmap. I cannot see other bigger refactorings in the mid-term. Keeping 
methods just because we changed so much in the last releases should not 
be a reason to keep confusing API. Users are happy to upgrade if they 
also get more features by upgrading (e.g. fromChangelogStream).


1. "fromDataStream VS fromInsertStream"

The main reason to change this API is to have the possibility to update 
the type mapping without breaking backwards compatibility. The name 
`fromInsertStream` makes it possible to have new semantics and makes 
concepts more explicit by naming.


2. "toAppendStream VS toInsertStream"

"Append" is common in the Flink community but the outside world uses 
"insert". Actually, the term "append-only table" is wrong because SQL 
tables have bag semantics without any order. So "appending" is more of 
an "insertion". This is also represented in FLIP-95's `RowKind` where we 
call the concepts INSERT and `ChangelogKind.insertOnly`.


3. "`.rowtime()` and `.proctime()`"

"API is also widely used, even in our test code"

The test code is already quite outdated and uses a lot of deprecated 
API. We need to deal with that with a better testing infrastructure. But 
this can be future work.


"users have already accepted it"

I'm not sure if users have already accepted it. I think we get at least 
one question around this topic every week because users would like to 
call `.rowtime` on arbitrary timestamps in the middle of the pipeline. 
And specifying all fields just to give the StreamRecord timestamp a name 
should be made easier. This is necessary in 80% of all use cases.


4. "toAppendStream(Table table, Class/TypeInformation)"

The DataType system is way easier than the TypeInformation system 
because it provides a consistent look and feel with a lot of utilities. 
E.g. many users didn't know that they can just pass `Row.class` in the 
past. Actually extracting types from a `Row.class` is not supported by 
the TypeExtractor (we recently even printed a warning to the logs) but 
we hacked some logic into the method. With AbstractDataType, users can 
still use classes via `DataTypes.of`; for example 
`toInsertStream(DataTypes.of(MyPojo.class))`.


5. "tEnv#createTemporaryView was introduced in release-1.10"

Similar to `TableEnvironment.execute()` we did some mistakes during the 
big refactorings. IMHO tEnv#createTemporaryView was one mistake because 
we introduced it too quickly. In general this method is correct, but now 
we cannot change the underlying semantics again without breaking 
existing pipelines. We could keep this method and just change the type 
system under the hood, in most of the cases the pipeline should still 
work but we cannot guarantee this due to slight differences.


6. "could it be "StreamTableEnvironment.fromDataStream(DataStream, 
ChangelogMode)"


No this is not possible, because T records have no changeflag. Without a 
changeflag, a ChangelogMode makes not much sense. That's why 
`from/toChangelogStream` supports only `Row` whereas the 
`from/toInsertStream` accepts arbitrary type classes.


7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"

I also thought about this method and using `Schema` there. However, with 
a schema you cannot specify the data type of the top-level record 
itself. We would need to offer fromDataStream(dataStream, Schema, 
DataType) or integrate the DataType into the Schema class itself which 
would mix up the concepts.


8. "name-based setters should always be based on fieldNames"

I'm fine with throwing an exception. If my mentioned semantics, are too 
confusing.


Regards,
Timo



On 02.09.20 07:25, Jingsong Li wrote:

a Row has two modes represented by an internal boolean flag

`hasFieldOrder`

+1 confusion with Dawid that what's the result when index-based setters and
name-based setters are mixed used.
And name-based setters look like append instead of set.

It reminds me of Avro's `GenericRecord`, We should support real random
name-based setters instead of append.

So, what I think is, name-based setters should always be based
on fieldNames just like name-based getters. Otherwise, throw an exception.

Best,
Jingsong

On Wed, Sep 2, 2020 at 12:43 PM Danny Chan  wrote:


Timo, Thanks for the discussion

I have only read the 

Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-02 Thread Xintong Song
Thanks all for the feedback and discussion.

I have updated the FLIP, with the following changes.

   - Choose the main proposal over the alternative approach
   - Combine weights of RocksDB and batch operators
   - Expose weights through configuration options, rather than via
   ExecutionConfig.
   - Add implementation plan.

Please help take another look.

Thank you~

Xintong Song



On Wed, Sep 2, 2020 at 2:41 PM Xintong Song  wrote:

> Thanks for the inputs, Aljoscha & Till.
>
>
> # Weight Configuration
>
>
> I think exposing the knobs incrementally is a good idea. However, I'm not
> sure about non-configurable as the first step.
>
>
> Currently, users can tune memory for rocksdb
> ('taskmanager.memory.managed.size') and python
> ('python.fn-execution.[framework|buffer].memory.size') separately, which
> practically means any combination of rocksdb and python memory sizes. If we
> switch to non-configurable weights, that will be a regression compared to
> 1.11.
>
>
> Therefore, I think exposing via configuration options might be a good
> first step. And we can discuss exposing via ExecutionConfig if later we see
> that requirement.
>
>
> # Naming of Weights
>
>
> I'm neutral for "Flink/Internal memory".
>
>
> I think the reason we can combine weights for batch algorithms and state
> backends is that they are never mixed together. My only concern
> for "Flink/Internal memory", which might not be a problem at the moment, is
> that what if new memory use cases appear in the future, which can also be
> described by "Flink/Internal memory" but is not guaranteed not mixed with
> batch algorithms or state backends?
>
>
> Anyway, I think the naming should not block this FLIP, as long as we have
> consensus on combining the two weights for rocksdb and batch algorithms. We
> can keep the naming discussion open until the implementation phase.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 1, 2020 at 10:19 PM Till Rohrmann 
> wrote:
>
>> Thanks for creating this FLIP Xintong.
>>
>> I agree with the previous comments that the memory configuration should be
>> as easy as possible. Every new knob has the potential to confuse users
>> and/or allows him to shoot himself in the foot. Consequently, I am +1 for
>> the first proposal in the FLIP since it is simpler.
>>
>> Also +1 for Stephan's proposal to combine batch operator's and
>> RocksDB's memory usage into one weight.
>>
>> Concerning the names for the two weights, I fear that we are facing one of
>> the two hard things in computer science. To add another idea, we could
>> name
>> them "Flink memory"/"Internal memory" and "Python memory".
>>
>> For the sake of making the scope of the FLIP as small as possible and to
>> develop the feature incrementally, I think that Aljoscha's proposal to
>> make
>> it non-configurable for the first step sounds like a good idea. As a next
>> step (and also if we see need), we can make the memory weights
>> configurable
>> via the configuration. And last, we could expose it via the
>> ExecutionConfig
>> if it is required.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 1, 2020 at 2:24 PM Aljoscha Krettek 
>> wrote:
>>
>> > Hi,
>> >
>> > playing devils advocate here: should we even make the memory weights
>> > configurable? We could go with weights that should make sense for most
>> > cases in the first version and only introduce configurable weights when
>> > (if) users need them.
>> >
>> > Regarding where/how things are configured, I think that most things
>> > should be a ConfigOption first (Thanks cc'in me, Stephan!). This makes
>> > them configurable via flink-conf.yaml and via command line parameters,
>> > for example "bin/flink run -D memory.foo=bla ...". We can think about
>> > offering programmatic API for cases where it makes sense, of course.
>> >
>> > Regarding naming one of the configurable weights
>> > "StateBackend-BatchAlgorithm". I think it's not a good idea to be that
>> > specific because the option will not age well. For example when we want
>> > to change which group of memory consumers are configured together or
>> > when we add something new.
>> >
>> > Best,
>> > Aljoscha
>> >
>> > On 31.08.20 08:13, Xintong Song wrote:
>> > > Thanks for the feedbacks, @Stephan
>> > >
>> > >
>> > >- There is a push to make as much as possible configurable via the
>> > main
>> > >> configuration, and not only in code. Specifically values for
>> operations
>> > and
>> > >> tuning.
>> > >>  I think it would be more important to have such memory weights
>> in
>> > the
>> > >> config, compared to in the program API. /cc Aljoscha
>> > >
>> > >
>> > > I can see the benefit that having memory weights in the main
>> > configuration
>> > > makes tuning easier, which makes great sense to me. On the other hand,
>> > what
>> > > we lose is the flexibility to have different weights for jobs running
>> in
>> > > the same Flink cluster. It seems to me the problem is that we don't
>> have
>> > an
>> > > easy way to overwrite job-specific 

Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-02 Thread Xintong Song
Thanks for the inputs, Aljoscha & Till.


# Weight Configuration


I think exposing the knobs incrementally is a good idea. However, I'm not
sure about non-configurable as the first step.


Currently, users can tune memory for rocksdb
('taskmanager.memory.managed.size') and python
('python.fn-execution.[framework|buffer].memory.size') separately, which
practically means any combination of rocksdb and python memory sizes. If we
switch to non-configurable weights, that will be a regression compared to
1.11.


Therefore, I think exposing via configuration options might be a good first
step. And we can discuss exposing via ExecutionConfig if later we see that
requirement.


# Naming of Weights


I'm neutral for "Flink/Internal memory".


I think the reason we can combine weights for batch algorithms and state
backends is that they are never mixed together. My only concern
for "Flink/Internal memory", which might not be a problem at the moment, is
that what if new memory use cases appear in the future, which can also be
described by "Flink/Internal memory" but is not guaranteed not mixed with
batch algorithms or state backends?


Anyway, I think the naming should not block this FLIP, as long as we have
consensus on combining the two weights for rocksdb and batch algorithms. We
can keep the naming discussion open until the implementation phase.


Thank you~

Xintong Song



On Tue, Sep 1, 2020 at 10:19 PM Till Rohrmann  wrote:

> Thanks for creating this FLIP Xintong.
>
> I agree with the previous comments that the memory configuration should be
> as easy as possible. Every new knob has the potential to confuse users
> and/or allows him to shoot himself in the foot. Consequently, I am +1 for
> the first proposal in the FLIP since it is simpler.
>
> Also +1 for Stephan's proposal to combine batch operator's and
> RocksDB's memory usage into one weight.
>
> Concerning the names for the two weights, I fear that we are facing one of
> the two hard things in computer science. To add another idea, we could name
> them "Flink memory"/"Internal memory" and "Python memory".
>
> For the sake of making the scope of the FLIP as small as possible and to
> develop the feature incrementally, I think that Aljoscha's proposal to make
> it non-configurable for the first step sounds like a good idea. As a next
> step (and also if we see need), we can make the memory weights configurable
> via the configuration. And last, we could expose it via the ExecutionConfig
> if it is required.
>
> Cheers,
> Till
>
> On Tue, Sep 1, 2020 at 2:24 PM Aljoscha Krettek 
> wrote:
>
> > Hi,
> >
> > playing devils advocate here: should we even make the memory weights
> > configurable? We could go with weights that should make sense for most
> > cases in the first version and only introduce configurable weights when
> > (if) users need them.
> >
> > Regarding where/how things are configured, I think that most things
> > should be a ConfigOption first (Thanks cc'in me, Stephan!). This makes
> > them configurable via flink-conf.yaml and via command line parameters,
> > for example "bin/flink run -D memory.foo=bla ...". We can think about
> > offering programmatic API for cases where it makes sense, of course.
> >
> > Regarding naming one of the configurable weights
> > "StateBackend-BatchAlgorithm". I think it's not a good idea to be that
> > specific because the option will not age well. For example when we want
> > to change which group of memory consumers are configured together or
> > when we add something new.
> >
> > Best,
> > Aljoscha
> >
> > On 31.08.20 08:13, Xintong Song wrote:
> > > Thanks for the feedbacks, @Stephan
> > >
> > >
> > >- There is a push to make as much as possible configurable via the
> > main
> > >> configuration, and not only in code. Specifically values for
> operations
> > and
> > >> tuning.
> > >>  I think it would be more important to have such memory weights in
> > the
> > >> config, compared to in the program API. /cc Aljoscha
> > >
> > >
> > > I can see the benefit that having memory weights in the main
> > configuration
> > > makes tuning easier, which makes great sense to me. On the other hand,
> > what
> > > we lose is the flexibility to have different weights for jobs running
> in
> > > the same Flink cluster. It seems to me the problem is that we don't
> have
> > an
> > > easy way to overwrite job-specific configurations without touching the
> > > codes.
> > >
> > >
> > > Given the current status, what if we make the memory weights
> configurable
> > > through both the main configuration and the programming API? The main
> > > configuration should take effect iff the weights are not explicitly
> > > specified through the programming API. In this way, job cluster users
> can
> > > easily tune the weight through the main configuration, while session
> > > cluster users, if they want to have different weights for jobs, can
> still
> > > overwrite the weight through execution configs.
> > >
> > >
> > >-