Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-05-30 Thread Leonard Xu
Thanks @Samrat for bringing this discussion.

It makes sense to me to introduce AWS Redshift connector for Apache Flink, and 
I’m glad to help review the design as well as the code review.

About the implementation phases, How about prioritizing support for the 
Datastream Sink API and TableSink API in the first phase? It seems that the 
primary use cases for the Redshift connector are acting as a sink for processed 
data by Flink.

Best,
Leonard


> On May 29, 2023, at 12:51 PM, Samrat Deb  wrote:
> 
> Hello all ,
> 
> Context:
> Amazon Redshift [1] is a fully managed, petabyte-scale data warehouse
> service in the cloud. It allows analyzing data without all of the
> configurations of a provisioned data warehouse. Resources are automatically
> provisioned and data warehouse capacity is intelligently scaled to deliver
> fast performance for even the most demanding and unpredictable workloads.
> Redshift is one of the widely used warehouse solutions in the current
> market.
> 
> Building flink connector redshift will allow flink users to have source and
> sink directly to redshift. It will help flink to expand the scope to
> redshift as a new connector in the ecosystem.
> 
> I would like to start a discussion on the FLIP-307: Flink connector
> redshift [2].
> Looking forward to comments, feedbacks and suggestions from the community
> on the proposal.
> 
> [1] https://docs.aws.amazon.com/redshift/latest/mgmt/welcome.html
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> 
> 
> 
> Bests,
> Samrat



Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-30 Thread Jingsong Li
+1, the fallback looks weird now, it is outdated.

But, it is good to provide an option. I don't know if there are some
users who depend on this fallback.

Best,
Jingsong

On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
>
> +1, the fallback was just intended as a temporary workaround to run 
> catalog/module related statements with hive dialect.
>
> On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
>>
>> Big +1 on this, thanks yuxia for driving this!
>>
>> yuxia  于2023年5月29日周一 14:55写道:
>>
>> > Hi, community.
>> >
>> > I want to start the discussion about Hive dialect shouldn't fall back to
>> > Flink's default dialect.
>> >
>> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
>> > it'll fall back to Flink's default parser[1] to handle flink-specific
>> > statements like "CREATE CATALOG xx with (xx);".
>> >
>> > As I‘m involving with Hive dialect and have some communication with
>> > community users who use Hive dialectrecently,  I'm thinking throw exception
>> > directly instead of falling back to Flink's default dialect when fail to
>> > parse the sql in Hive dialect
>> >
>> > Here're some reasons:
>> >
>> > First of all, it'll hide some error with Hive dialect. For example, we
>> > found we can't use Hive dialect any more with Flink sql client in release
>> > validation phase[2], finally we find a modification in Flink sql client
>> > cause it, but our test case can't find it earlier for although HiveParser
>> > faill to parse it but then it'll fall back to default parser and pass test
>> > case successfully.
>> >
>> > Second, conceptually, Hive dialect should be do nothing with Flink's
>> > default dialect. They are two totally different dialect. If we do need a
>> > dialect mixing Hive dialect and default dialect , may be we need to propose
>> > a new hybrid dialect and announce the hybrid behavior to users.
>> > Also, It made some users confused for the fallback behavior. The fact
>> > comes from I had been ask by community users. Throw an excpetioin directly
>> > when fail to parse the sql statement in Hive dialect will be more 
>> > intuitive.
>> >
>> > Last but not least, it's import to decouple Hive with Flink planner[3]
>> > before we can externalize Hive connector[4]. If we still fall back to Flink
>> > default dialct, then we will need depend on `ParserImpl` in Flink planner,
>> > which will block us removing the provided dependency of Hive dialect as
>> > well as externalizing Hive connector.
>> >
>> > Although we hadn't announced the fall back behavior ever, but some users
>> > may implicitly depend on this behavior in theirs sql jobs. So, I hereby
>> > open the dicussion about abandoning the fall back behavior to make Hive
>> > dialect clear and isoloted.
>> > Please remember it won't break the Hive synatax but the syntax specified
>> > to Flink may fail after then. But for the failed sql, you can use `SET
>> > table.sql-dialect=default;` to switch to Flink dialect.
>> > If there's some flink-specific statements we found should be included in
>> > Hive dialect to be easy to use, I think we can still add them as specific
>> > cases to Hive dialect.
>> >
>> > Look forwards to your feedback. I'd love to listen the feedback from
>> > community to take the next steps.
>> >
>> > [1]:
>> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
>> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
>> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
>> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
>> >
>> >
>> >
>> > Best regards,
>> > Yuxia
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards!
> Rui Li


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-30 Thread João Boto
Hi Lijie,

I left the two options to use whatever you want, but I can clean the FLIP to 
have only one..

Updated the FLIP

Regards

On 2023/05/23 07:23:45 Lijie Wang wrote:
> Hi Joao,
> 
> I noticed the FLIP currently contains the following 2 methods about type
> serializer:
> 
> (1)  TypeSerializer createInputSerializer();
> (2)  TypeSerializer createSerializer(TypeInformation inType);
> 
> Is the method (2) still needed now?
> 
> Best,
> Lijie
> 
> João Boto  于2023年5月19日周五 16:53写道:
> 
> > Updated the FLIP to use this option.
> >
> 


Fwd: Parquet fille sink to azure blob

2023-05-30 Thread Eli Golin
I have defined the following sink:






































*object ParquetSink {  def parquetFileSink[A <: Message: ClassTag](
assigner: A => String,  config: Config  )(implicit lc: LoggingConfigs):
FileSink[A] = {val bucketAssigner = new BucketAssigner[A, String] {
  override def getBucketId(element: A, context: BucketAssigner.Context):
String = {val path = assigner(element)logger.info
(LogMessage(-1, s"Writing file to
${config.getString(baseDirKey)}/$path", "NA"))path  }
override def getSerializer: SimpleVersionedSerializer[String] =
SimpleVersionedStringSerializer.INSTANCE}def builder(outFile:
OutputFile): ParquetWriter[A] =  new
ParquetProtoWriters.ParquetProtoWriterBuilder(outFile,
implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
).withCompressionCodec(config.getCompression(compressionKey)).build()
val parquetBuilder: ParquetBuilder[A] = path => builder(path)FileSink
.forBulkFormat(new
Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.blob.core.windows.net
"),new
ParquetWriterFactory[A](parquetBuilder)  )
.withBucketAssigner(bucketAssigner)  .withOutputFileConfig(
OutputFileConfig  .builder()  .withPartSuffix(".parquet")
.build()  )  .build()  }}*

After deploying the job I get the following exception:

*Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
AzureBlob are only supported for ABFS *
*at
org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44)
~[?:?]  *



*at
org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
~[?:?]
at
org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.(AzureBlobRecoverableWriter.java:37)
~[?:?]
  at
org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44)
~[?:?]
   at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]*

The question is whether I can make any changes to this code to make it work
with *wasbs *protocol and not *abfs* ?

Tnx in advance, Eli.

-- 
Confidentiality Notice: This e-mail communication and any attachments may 
contain confidential and privileged information for the use of the 
designated recipient(s) above. If you are not the intended recipient(s), 
you are hereby notified that you received this communication in error and 
that any review, disclosure, dissemination, distribution or copying of it 
or its contents is prohibited. If you have received this communication in 
error, please notify me immediately by replying to this message and 
deleting it from your computer. Thank you  


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi again,

Thanks Dong, yes I think your concerns are valid, and that's why I have
previously refined my idea to use one of the backpressure measuring metrics
that we already have.
Either simply `isBackPressured == true` check [1], or
`backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That would
address your three first concerns:
  - lack of event time
  - event time unreliability
  - lack of universal threshold value for `pendingRecords`

In a bit more detail, we probably should check (using [1] or [2]) either:
  a) if any of the source subtasks is backpressured
  b) if any of the subtasks is backpressured

In most cases a == b. The only time when that's not true, if some windowed
operator in the middle of the job graph started triggering so many results
that it became backpressured,
but the backpressure didn't last long enough to propagate to sources. For
example that especially might occur if sources are idle. So probably b) is
a better and more generic option.

Regarding your last concern, with spiky traffic, I think the following
algorithm of triggering checkpoints would work pretty well:

public BackpressureDetectingCheckpointTrigger {

private long lastCheckpointTs = System.currentTimeMillis();
private long slowCheckpointInterval = ...;
private long fastCheckpointInteveral = ...;

//code executed periodically, for example once a second, once every 10ms,
or at the 1/10th of the fast checkpoint interval
void maybeTriggerCheckpoint(...) {

  long nextCheckpointTs = lastCheckpointTs;
  if (isAnySubtaskBackpressured()) {
nextCheckpointTs += slowCheckpointInterval;
  }
  else {
  nextCheckpointTs += fastCheckpointInterval;
  }

  if (nextCheckpointTs >= System.currentTimeMillis()) {
triggerCheckpoint();
lastCheckpointTs = System.currentTimeMillis();
  }
}
}

This way, if there is a spike of backpressure, it doesn't matter that much.
If the backpressure goes away until the next iteration, the next check will
trigger a checkpoint according to the
fast interval. The slow checkpoint interval will be used only if the
backpressure persists for the whole duration of the slowCheckpointInterval.

We could also go a little bit more fancy, and instead of using only fast or
slow intervals, we could use a continuous spectrum to gradually adjust the
interval, by replacing the first if/else
check with a weighted average:

  int maxBackPressureTime = getSubtaskMaxBackPressuredTimeMsPerSecond();
  long nextCheckpointTs = lastCheckpointTs + slowCheckpointInterval *
maxBackPressureTime + fastCheckpointInterval * (1000 - maxBackPressureTime);

This would further eliminate some potential jitter and make the actual
checkpoint interval a bit more predictable.

Best,
Piotrek


wt., 30 maj 2023 o 04:40 Dong Lin  napisał(a):

> Let me correct the typo in the last paragraph as below:
>
> To make the problem even harder, the incoming traffic can be spiky. And the
> overhead of triggering checkpointing can be relatively low, in which case
> it might be more performant (w.r.t. e2e lag) for the Flink job to
> checkpoint at the more frequent interval in the continuous phase in face of
> a spike in the number of pending records buffered in the source operator.
>
>
> On Tue, May 30, 2023 at 9:17 AM Dong Lin  wrote:
>
> > Hi Piotrek,
> >
> > Thanks for providing more details of the alternative approach!
> >
> > If I understand your proposal correctly, here are the requirements for it
> > to work without incurring any regression:
> >
> > 1) The source needs a way to determine whether there exists backpressure.
> > 2) If there is backpressure, then it means e2e latency is already high
> > and there should be no harm to use the less frequent checkpointing
> interval.
> > 3) The configuration of the "less frequent checkpointing interval" needs
> > to be a job-level config so that it works for sources other than
> > HybridSource.
> >
> > I would say that if we can find a way for the source to determine the
> > "existence of backpressure" and meet the requirement 2), it would indeed
> be
> > a much more elegant approach that solves more use-cases.
> >
> > The devil is in the details. I am not sure how to determine the
> "existence
> > of backpressure". Let me explain my thoughts and maybe you can help
> > provide the answers.
> >
> > To make the discussion more concrete, let's say the input records do not
> > have event timestamps. Users want to checkpoint at least once every 30
> > minutes to upper-bound the amount of duplicate work after job failover.
> And
> > users want to checkpoint at least once every 30 seconds to upper-bound
> *extra
> > e2e lag introduced by the Flink job* during the continuous processing
> > phase.
> >
> > Since the input records do not have event timestamps, we can not rely on
> > metrics such as currentFetchEventTimeLag [1] to determine the absolute
> e2e
> > lag, because currentFetchEventTimeLag depends on the existence of event
> > timestamps.
> >
> > Also note that, even if the input recor

Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-05-30 Thread yuxia
Hi, Jingsong.
Thanks for your feedback.

> Does this need to be a function call? Do you have some example?
I think it'll be useful to support function call when user call procedure.
The following example is from iceberg:[1]
CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'));

It allows user to use `map('foo', 'bar')` to pass a map data to procedure. 

Another case that I can imagine may be rollback a table to the snapshot of one 
week ago.
Then, with function call, user may call `rollback(table_name, now() - INTERVAL 
'7' DAY)` to acheive such purpose.

Although it can be function call, the eventual parameter got by the procedure 
will always be the literal evaluated.


> Procedure looks like a TableFunction, do you consider using Collector
something like TableFunction? (Supports large amount of data)

Yes, I had considered it. But returns T[] is for simpility,

First, regarding how to return the calling result of a procedure, it looks more 
intuitive to me to use the return result of the `call` method instead of by 
calling something like collector#collect.
Introduce a collector will increase necessary complexity. 

Second, regarding supporting large amount of data,  acoording my investagtion, 
I haven't seen the requirement that supports returning large amount of data.
Iceberg also return an array.[2] If you do think we should support large amount 
of data, I think we can change to return type from T[] to Iterable

[1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate
[2]: 
https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jingsong Li" 
收件人: "dev" 
发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04
主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

Thanks Yuxia for the proposal.

> CALL [catalog_name.][database_name.]procedure_name ([ expression [, 
> expression]* ] )

The expression can be a function call. Does this need to be a function
call? Do you have some example?

> Procedure returns T[]

Procedure looks like a TableFunction, do you consider using Collector
something like TableFunction? (Supports large amount of data)

Best,
Jingsong

On Mon, May 29, 2023 at 2:33 PM yuxia  wrote:
>
> Hi, everyone.
>
> I’d like to start a discussion about FLIP-311: Support Call Stored Procedure 
> [1]
>
> Stored procedure provides a convenient way to encapsulate complex logic to 
> perform data manipulation or administrative tasks in external storage 
> systems. It's widely used in traditional databases and popular compute 
> engines like Trino for it's convenience. Therefore, we propose adding support 
> for call stored procedure in Flink to enable better integration with external 
> storage systems.
>
> With this FLIP, Flink will allow connector developers to develop their own 
> built-in stored procedures, and then enables users to call these predefiend 
> stored procedures.
>
> Looking forward to your feedbacks.
>
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
>
> Best regards,
> Yuxia


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-05-30 Thread Samrat Deb
Hi Leonard,

> and I’m glad to help review the design as well as the code review.
Thank you so much. It would be really great and helpful to bring
flink-connector-redshift for flink users :) .

I have divided the implementation in 3 phases in the `Scope` Section[1].
1st phase is to

   - Integrate with Flink Sink API (*FLIP-171*
   
   )


> About the implementation phases, How about prioritizing support for the
Datastream Sink API and TableSink API in the first phase?
I can completely resonate with you to prioritize support for Datastream
Sink API and TableSink API in the first phase.
I will update the FLIP[1] as you have suggested.

> It seems that the primary use cases for the Redshift connector are acting
as a sink for processed data by Flink.
Yes, majority ask and requirement for Redshift connector is sink for
processed data by Flink.

Bests,
Samrat

On Tue, May 30, 2023 at 12:35 PM Leonard Xu  wrote:

> Thanks @Samrat for bringing this discussion.
>
> It makes sense to me to introduce AWS Redshift connector for Apache Flink,
> and I’m glad to help review the design as well as the code review.
>
> About the implementation phases, How about prioritizing support for the
> Datastream Sink API and TableSink API in the first phase? It seems that the
> primary use cases for the Redshift connector are acting as a sink for
> processed data by Flink.
>
> Best,
> Leonard
>
>
> > On May 29, 2023, at 12:51 PM, Samrat Deb  wrote:
> >
> > Hello all ,
> >
> > Context:
> > Amazon Redshift [1] is a fully managed, petabyte-scale data warehouse
> > service in the cloud. It allows analyzing data without all of the
> > configurations of a provisioned data warehouse. Resources are
> automatically
> > provisioned and data warehouse capacity is intelligently scaled to
> deliver
> > fast performance for even the most demanding and unpredictable workloads.
> > Redshift is one of the widely used warehouse solutions in the current
> > market.
> >
> > Building flink connector redshift will allow flink users to have source
> and
> > sink directly to redshift. It will help flink to expand the scope to
> > redshift as a new connector in the ecosystem.
> >
> > I would like to start a discussion on the FLIP-307: Flink connector
> > redshift [2].
> > Looking forward to comments, feedbacks and suggestions from the community
> > on the proposal.
> >
> > [1] https://docs.aws.amazon.com/redshift/latest/mgmt/welcome.html
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> >
> >
> >
> > Bests,
> > Samrat
>
>


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-05-30 Thread Samrat Deb
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift

[note] Missed the trailing link for previous mail



On Tue, May 30, 2023 at 2:43 PM Samrat Deb  wrote:

> Hi Leonard,
>
> > and I’m glad to help review the design as well as the code review.
> Thank you so much. It would be really great and helpful to bring
> flink-connector-redshift for flink users :) .
>
> I have divided the implementation in 3 phases in the `Scope` Section[1].
> 1st phase is to
>
>- Integrate with Flink Sink API (*FLIP-171*
>
>)
>
>
> > About the implementation phases, How about prioritizing support for the
> Datastream Sink API and TableSink API in the first phase?
> I can completely resonate with you to prioritize support for Datastream
> Sink API and TableSink API in the first phase.
> I will update the FLIP[1] as you have suggested.
>
> > It seems that the primary use cases for the Redshift connector are
> acting as a sink for processed data by Flink.
> Yes, majority ask and requirement for Redshift connector is sink for
> processed data by Flink.
>
> Bests,
> Samrat
>
> On Tue, May 30, 2023 at 12:35 PM Leonard Xu  wrote:
>
>> Thanks @Samrat for bringing this discussion.
>>
>> It makes sense to me to introduce AWS Redshift connector for Apache
>> Flink, and I’m glad to help review the design as well as the code review.
>>
>> About the implementation phases, How about prioritizing support for the
>> Datastream Sink API and TableSink API in the first phase? It seems that the
>> primary use cases for the Redshift connector are acting as a sink for
>> processed data by Flink.
>>
>> Best,
>> Leonard
>>
>>
>> > On May 29, 2023, at 12:51 PM, Samrat Deb  wrote:
>> >
>> > Hello all ,
>> >
>> > Context:
>> > Amazon Redshift [1] is a fully managed, petabyte-scale data warehouse
>> > service in the cloud. It allows analyzing data without all of the
>> > configurations of a provisioned data warehouse. Resources are
>> automatically
>> > provisioned and data warehouse capacity is intelligently scaled to
>> deliver
>> > fast performance for even the most demanding and unpredictable
>> workloads.
>> > Redshift is one of the widely used warehouse solutions in the current
>> > market.
>> >
>> > Building flink connector redshift will allow flink users to have source
>> and
>> > sink directly to redshift. It will help flink to expand the scope to
>> > redshift as a new connector in the ecosystem.
>> >
>> > I would like to start a discussion on the FLIP-307: Flink connector
>> > redshift [2].
>> > Looking forward to comments, feedbacks and suggestions from the
>> community
>> > on the proposal.
>> >
>> > [1] https://docs.aws.amazon.com/redshift/latest/mgmt/welcome.html
>> > [2]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
>> >
>> >
>> >
>> > Bests,
>> > Samrat
>>
>>


[jira] [Created] (FLINK-32217) Retain metric store can cause NPE

2023-05-30 Thread Junrui Li (Jira)
Junrui Li created FLINK-32217:
-

 Summary: Retain metric store can cause NPE 
 Key: FLINK-32217
 URL: https://issues.apache.org/jira/browse/FLINK-32217
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.17.0
Reporter: Junrui Li
 Fix For: 1.18.0, 1.16.3, 1.17.2


When metricsFetcher fetches metrics, it will update the metricsStore 
([here|https://github.com/apache/flink/blob/d6c3d332340922c24d1af9dd8835d0bf790184b5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java#LL91C44-L91C44]).
 But in this method, it can get null metricStore and cause NPE, which will lead 
to incorrect results of metrics retain, and we should also fix it from the 
perspective of stability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-05-30 Thread Tamir Sagi
Hey Chesnay,

I'm sending a follow up email regarding JDK 17 support.

I see the Epic[1] is in progress and frequently updated. I'm curios if there is 
an ETA or any plan to release a major release with JDK 17 support that is not 
backward compatible.

Thanks,
Tamir


[1] https://issues.apache.org/jira/browse/FLINK-15736

From: Alexey Novakov 
Sent: Friday, April 28, 2023 11:41 AM
To: Chesnay Schepler 
Cc: Thomas Weise ; dev@flink.apache.org 
; Jing Ge ; Tamir Sagi 
; Piotr Nowojski ; Alexis 
Sarda-Espinosa ; user 
Subject: Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)


EXTERNAL EMAIL


If breaking savepoint compatibility will be eventually an option, I would 
recommend to try to upgrade Flink's Scala even to 2.13

Best regards,
Alexey

On Fri, Apr 28, 2023 at 10:22 AM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
We don't know yet. I wanted to run some more experiments to see if I cant get 
Scala 2.12.7 working on Java 17.

If that doesn't work, then it would also be an option to bump Scala in the Java 
17 builds (breaking savepoint compatibility), and users should just only use 
the Java APIs.

The alternative to _that_ is doing this when we drop the Scala API.

On 28/04/2023 01:11, Thomas Weise wrote:
Is the intention to bump the Flink major version and only support Java 17+? If 
so, can Scala not be upgraded at the same time?

Thanks,
Thomas


On Thu, Apr 27, 2023 at 4:53 PM Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:
Scala 2.12.7 doesn't compile on Java 17, see
https://issues.apache.org/jira/browse/FLINK-25000.

On Thu, Apr 27, 2023 at 3:11 PM Jing Ge 
mailto:j...@ververica.com>> wrote:

> Thanks Tamir for the information. According to the latest comment of the
> task FLINK-24998, this bug should be gone while using the latest JDK 17. I
> was wondering whether it means that there are no more issues to stop us
> releasing a major Flink version to support Java 17? Did I miss something?
>
> Best regards,
> Jing
>
> On Thu, Apr 27, 2023 at 8:18 AM Tamir Sagi 
> mailto:tamir.s...@niceactimize.com>>
> wrote:
>
>> More details about the JDK bug here
>> https://bugs.openjdk.org/browse/JDK-8277529
>>
>> Related Jira ticket
>> https://issues.apache.org/jira/browse/FLINK-24998
>>
>> --
>> *From:* Jing Ge via user 
>> mailto:u...@flink.apache.org>>
>> *Sent:* Monday, April 24, 2023 11:15 PM
>> *To:* Chesnay Schepler mailto:ches...@apache.org>>
>> *Cc:* Piotr Nowojski mailto:pnowoj...@apache.org>>; 
>> Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com>; Martijn Visser 
>> mailto:martijnvis...@apache.org>>;
>> dev@flink.apache.org 
>> mailto:dev@flink.apache.org>>; user 
>> mailto:u...@flink.apache.org>>
>> *Subject:* Re: [Discussion] - Release major Flink version to support JDK
>> 17 (LTS)
>>
>>
>> *EXTERNAL EMAIL*
>>
>>
>> Thanks Chesnay for working on this. Would you like to share more info
>> about the JDK bug?
>>
>> Best regards,
>> Jing
>>
>> On Mon, Apr 24, 2023 at 11:39 AM Chesnay Schepler 
>> mailto:ches...@apache.org>>
>> wrote:
>>
>> As it turns out Kryo isn't a blocker; we ran into a JDK bug.
>>
>> On 31/03/2023 08:57, Chesnay Schepler wrote:
>>
>>
>> https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5#migration-guide
>>
>> Kroy themselves state that v5 likely can't read v2 data.
>>
>> However, both versions can be on the classpath without classpath as v5
>> offers a versioned artifact that includes the version in the package.
>>
>> It probably wouldn't be difficult to migrate a savepoint to Kryo v5,
>> purely from a read/write perspective.
>>
>> The bigger question is how we expose this new Kryo version in the API. If
>> we stick to the versioned jar we need to either duplicate all current
>> Kryo-related APIs or find a better way to integrate other serialization
>> stacks.
>> On 30/03/2023 17:50, Piotr Nowojski wrote:
>>
>> Hey,
>>
>> > 1. The Flink community agrees that we upgrade Kryo to a later version,
>> which means breaking all checkpoint/savepoint compatibility and releasing a
>> Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support
>> dropped. This is probably the quickest way, but would still mean that we
>> expose Kryo in the Flink APIs, which is the main reason why we haven't been
>> able to upgrade Kryo at all.
>>
>> This sounds pretty bad to me.
>>
>> Has anyone looked into what it would take to provide a smooth migration
>> from Kryo2 -> Kryo5?
>>
>> Best,
>> Piotrek
>>
>> czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa 
>> mailto:sarda.espin...@gmail.com>>
>> napisał(a):
>>
>> Hi Martijn,
>>
>> just to be sure, if all state-related classes use a POJO serializer, Kryo
>> will never come into play, right? Given FLINK-16686 [1], I wonder how many
>> users actually have jobs with Kryo and RocksDB, but even if there aren't
>> many, that still leaves those who don't use RocksDB for
>> checkpoin

[SUMMARY] Flink 1.18 Release Sync 05/30/2023

2023-05-30 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from the release sync of 1.18 on May 30.

1. @developers please update the progress of your features on 1.18 release
wiki page [1] ! That will help us a lot to have an overview of the entire
release cycle.

2. We found a JIRA issue (FLINK-18356) [2] that doesn't have an assignee,
which is a CI instability of the flink-table-planner module. It'll be nice
if someone in the community could pick it up and make some investigations
:-)

There are 6 weeks before the feature freeze date (Jul 11). The next release
sync will be on Jun 13, 2023. Welcome to join us [3]!

[1] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
[2] https://issues.apache.org/jira/browse/FLINK-18356
[3] Zoom meeting:
https://us04web.zoom.us/j/79158702091?pwd=8CXPqxMzbabWkma5b0qFXI1IcLbxBh.1

Best regards,
Jing, Konstantin, Sergey and Qingsheng


Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-05-30 Thread Jingsong Li
Thanks for your explanation.

We can support Iterable in future. Current design looks good to me.

Best,
Jingsong

On Tue, May 30, 2023 at 4:56 PM yuxia  wrote:
>
> Hi, Jingsong.
> Thanks for your feedback.
>
> > Does this need to be a function call? Do you have some example?
> I think it'll be useful to support function call when user call procedure.
> The following example is from iceberg:[1]
> CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 
> 'bar'));
>
> It allows user to use `map('foo', 'bar')` to pass a map data to procedure.
>
> Another case that I can imagine may be rollback a table to the snapshot of 
> one week ago.
> Then, with function call, user may call `rollback(table_name, now() - 
> INTERVAL '7' DAY)` to acheive such purpose.
>
> Although it can be function call, the eventual parameter got by the procedure 
> will always be the literal evaluated.
>
>
> > Procedure looks like a TableFunction, do you consider using Collector
> something like TableFunction? (Supports large amount of data)
>
> Yes, I had considered it. But returns T[] is for simpility,
>
> First, regarding how to return the calling result of a procedure, it looks 
> more intuitive to me to use the return result of the `call` method instead of 
> by calling something like collector#collect.
> Introduce a collector will increase necessary complexity.
>
> Second, regarding supporting large amount of data,  acoording my 
> investagtion, I haven't seen the requirement that supports returning large 
> amount of data.
> Iceberg also return an array.[2] If you do think we should support large 
> amount of data, I think we can change to return type from T[] to Iterable
>
> [1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate
> [2]: 
> https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jingsong Li" 
> 收件人: "dev" 
> 发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>
> Thanks Yuxia for the proposal.
>
> > CALL [catalog_name.][database_name.]procedure_name ([ expression [, 
> > expression]* ] )
>
> The expression can be a function call. Does this need to be a function
> call? Do you have some example?
>
> > Procedure returns T[]
>
> Procedure looks like a TableFunction, do you consider using Collector
> something like TableFunction? (Supports large amount of data)
>
> Best,
> Jingsong
>
> On Mon, May 29, 2023 at 2:33 PM yuxia  wrote:
> >
> > Hi, everyone.
> >
> > I’d like to start a discussion about FLIP-311: Support Call Stored 
> > Procedure [1]
> >
> > Stored procedure provides a convenient way to encapsulate complex logic to 
> > perform data manipulation or administrative tasks in external storage 
> > systems. It's widely used in traditional databases and popular compute 
> > engines like Trino for it's convenience. Therefore, we propose adding 
> > support for call stored procedure in Flink to enable better integration 
> > with external storage systems.
> >
> > With this FLIP, Flink will allow connector developers to develop their own 
> > built-in stored procedures, and then enables users to call these predefiend 
> > stored procedures.
> >
> > Looking forward to your feedbacks.
> >
> > [1]: 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> >
> > Best regards,
> > Yuxia


[jira] [Created] (FLINK-32218) Implement support for parent/child shard ordering

2023-05-30 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-32218:
---

 Summary: Implement support for parent/child shard ordering
 Key: FLINK-32218
 URL: https://issues.apache.org/jira/browse/FLINK-32218
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh


Implement support for parent/child shard ordering in the KDS connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Dong Lin
Hi Piotr,

Thank you for providing those details.

I understand you suggested using the existing "isBackPressured" signal to
determine whether we should use the less frequent checkpointing interval. I
followed your thoughts and tried to make it work. Below are the issues that
I am not able to address. Can you see if there is a way to address these
issues?

Let's will use the following use-case to make the discussion more concrete:
a) Users want to checkpoint at least once every 30 minutes to upper-bound
the amount of duplicate work after job failover.
b) Users want to checkpoint at least once every 30 seconds to
upper-bound *extra
e2e lag introduced by the Flink job* during the continuous processing phase.

The suggested approach is designed to do this:
- If any of the source subtasks is backpressured, the job will checkpoint
at 30-minutes interval.
- If none of the source subtasks is backpressured, the job will checkpoint
at 30-seconds interval.

And we would need to add the following public APIs to implement this
approach:
- Add a job level config, maybe
execution.checkpointing.interval.no-backpressure. This is the checkpointing
interval when none of the source subtasks is backpressured.
- Add a public API for source operator subtasks to report their
backpressure status to the checkpointing coordinator. The subtask should
invoke this API whenever its backpressure status changed.

Now, in order to make the suggested approach work for all users (i.e. no
regression), we need to make sure that whenever we use the 30-minutes
checkpointing interval, the e2e latency will be less than or equal to the
case where we use the 30-seconds checkpointing interval.

I thought about this in detail, and found the following fabricated
scenarios where this approach might cause regression:

During the continuous processing phase, the input throughput is 5MBps for 1
minute, and 11MBps for 1 minutes, in lock-steps. The maximum throughput
achievable by this job is 10Mbps. For simplicity, suppose the buffer size
can hold roughly 1 second worth-of-data, then the job is
backpressured roughly 1 minutes out of every 2 minutes.

With the suggested approach, the e2e latency introduced by Flink is roughly
72 seconds. This is because it takes 1 minute for 11MBps phase to end, and
another 12 seconds for the accumulated backlog to be cleared. And Flink can
not do checkpoint before the backlog is cleared.

On the other hand, if we continue to checkpoint at 30-seconds interval, the
e2e latency introduced by Flink is at most 42 seconds, plus the extra delay
introduced by the checkpoint overhead. The e2e latency will be better than
the suggested approach, if the impact of the checkpoint is less than 30
seconds.

I think the root cause of this issue is that the decision of the
checkpointing interval really depends on the expected impact of a
checkpoint on the throughput. For example, if the checkpointing overhead is
close to none, then it is beneficial to the e2e latency to still checkpoint
a high frequency even if there exists (intermittent) backpressure.

Here is another fabricated use-case where the suggested approach might
cause regression. Let's say user's job is
*hybridSource.keyBy(...).transform(operatorA).sinkTo(PaimonSink)*. The
parallelism is 2. As we can see, there is all-to-all edge between source
and operatorA. And due to limited resources (e.g. buffer), at any given
time, each operatorA subtask can only process data from one of its upstream
subtask at a time, meaning that the other upstream subtask will be
backpressured. So there might always be at least one source subtask that is
backpressured even though the job's throughput can catch up with the input
throughput. However, the suggested approach might end up always using the
less frequent checkpointing interval in this case.

Suppose we can find a way to address the above issues, another issue with
the suggested approach is the extra communication overhead between the
source operator subtasks and the checkpointing coordinator. The source
subtask needs to send a message to checkpointing coordinator whenever its
backpressure status changes. The more frequently we check (e.g. once every
10 ms), the larger the overhead. And if we check not so frequently (e.g.
once every second), we might be more vulnerable to random/occasional
backpressure. So there seems to be tradeoff between the reliability and the
cost of this approach.

Thanks again for the suggestion. I am looking forward to your comments.

Best,
Dong


On Tue, May 30, 2023 at 4:37 PM Piotr Nowojski  wrote:

> Hi again,
>
> Thanks Dong, yes I think your concerns are valid, and that's why I have
> previously refined my idea to use one of the backpressure measuring metrics
> that we already have.
> Either simply `isBackPressured == true` check [1], or
> `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That would
> address your three first concerns:
>   - lack of event time
>   - event time unreliability
>   - lack of 

Re: [DISCUSS] Status of Statefun Project

2023-05-30 Thread Galen Warren
Getting to a resolution here would be great and much appreciated, yes.

On Sat, May 27, 2023 at 1:03 AM Salva Alcántara 
wrote:

> Hey Galen,
>
> I took a look at StateFun some time ago; not using it in production but I
> agree that it would be a pity to abandon it.
>
> As Martijn said, let's be clear on what lies ahead (what option is finally
> picked) and how to contribute (if possible) moving forward.
>
> Regards,
>
> Salva
>
> On Sat, May 27, 2023 at 4:05 AM Galen Warren via user <
> u...@flink.apache.org> wrote:
>
>> Ok, I get it. No interest.
>>
>> If this project is being abandoned, I guess I'll work with my own fork.
>> Is there anything I should consider here? Can I share it with other people
>> who use this project?
>>
>> On Tue, May 16, 2023 at 10:50 AM Galen Warren 
>> wrote:
>>
>>> Hi Martijn, since you opened this discussion thread, I'm curious what
>>> your thoughts are in light of the responses? Thanks.
>>>
>>> On Wed, Apr 19, 2023 at 1:21 PM Galen Warren 
>>> wrote:
>>>
 I use Apache Flink for stream processing, and StateFun as a hand-off
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.


 This is essentially how I use it as well, and I would also be sad to
 see it sunsetted. It works well; I don't know that there is a lot of new
 development required, but if there are no new Statefun releases, then
 Statefun can only be used with older Flink versions.

 On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
 mvillalo...@kineteque.com> wrote:

> I am currently using Stateful Functions in my application.
>
> I use Apache Flink for stream processing, and StateFun as a hand-off
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.
>
> I would be disappointed if StateFun was sunsetted.  Its a good idea.
>
> If there is anything I can do to help, as a contributor perhaps,
> please let me know.
>
> > On Apr 3, 2023, at 2:02 AM, Martijn Visser 
> wrote:
> >
> > Hi everyone,
> >
> > I want to open a discussion on the status of the Statefun Project
> [1] in Apache Flink. As you might have noticed, there hasn't been much
> development over the past months in the Statefun repository [2]. There is
> currently a lack of active contributors and committers who are able to 
> help
> with the maintenance of the project.
> >
> > In order to improve the situation, we need to solve the lack of
> committers and the lack of contributors.
> >
> > On the lack of committers:
> >
> > 1. Ideally, there are some of the current Flink committers who have
> the bandwidth and can help with reviewing PRs and merging them.
> > 2. If that's not an option, it could be a consideration that current
> committers only approve and review PRs, that are approved by those who are
> willing to contribute to Statefun and if the CI passes
> >
> > On the lack of contributors:
> >
> > 3. Next to having this discussion on the Dev and User mailing list,
> we can also create a blog with a call for new contributors on the Flink
> project website, send out some tweets on the Flink / Statefun twitter
> accounts, post messages on Slack etc. In that message, we would inform how
> those that are interested in contributing can start and where they could
> reach out for more information.
> >
> > There's also option 4. where a group of interested people would
> split Statefun from the Flink project and make it a separate top level
> project under the Apache Flink umbrella (similar as recently has happened
> with Flink Table Store, which has become Apache Paimon).
> >
> > If we see no improvements in the coming period, we should consider
> sunsetting Statefun and communicate that clearly to the users.
> >
> > I'm looking forward to your thoughts.
> >
> > Best regards,
> >
> > Martijn
> >
> > [1] https://nightlies.apache.org/flink/flink-statefun-docs-master/ <
> https://nightlies.apache.org/flink/flink-statefun-docs-master/>
> > [2] https://github.com/apache/flink-statefun <
> https://github.com/apache/flink-statefun>
>



Re: [SUMMARY] Flink 1.18 Release Sync 05/30/2023

2023-05-30 Thread Jing Ge
Thanks Qingsheng for driving it!

@Devs
As you might already be aware of, there are many externalizations and new
releases of Flink connectors. Once a connector has been externalized
successfully, i.e. the related module has been removed in the Flink repo,
we will not set a priority higher than major to tasks related to those
connectors.

Best regards,
Jing

On Tue, May 30, 2023 at 11:48 AM Qingsheng Ren  wrote:

> Hi devs and users,
>
> I'd like to share some highlights from the release sync of 1.18 on May 30.
>
> 1. @developers please update the progress of your features on 1.18 release
> wiki page [1] ! That will help us a lot to have an overview of the entire
> release cycle.
>
> 2. We found a JIRA issue (FLINK-18356) [2] that doesn't have an assignee,
> which is a CI instability of the flink-table-planner module. It'll be nice
> if someone in the community could pick it up and make some investigations
> :-)
>
> There are 6 weeks before the feature freeze date (Jul 11). The next release
> sync will be on Jun 13, 2023. Welcome to join us [3]!
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
> [2] https://issues.apache.org/jira/browse/FLINK-18356
> [3] Zoom meeting:
> https://us04web.zoom.us/j/79158702091?pwd=8CXPqxMzbabWkma5b0qFXI1IcLbxBh.1
>
> Best regards,
> Jing, Konstantin, Sergey and Qingsheng
>


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Weihua Hu
Thanks Paul for the proposal.

+1 for this. It is valuable in improving ease of use.

I have a few questions.
- Is SQLRunner the better name? We use this to run a SQL Job. (Not strong,
the SQLDriver is fine for me)
- Could we run SQL jobs using SQL in strings? Otherwise, we need to prepare
a SQL file in an image for Kubernetes application mode, which may be a bit
cumbersome.
- I noticed that we don't specify the SQLDriver jar in the "run-application"
command. Does that mean we need to perform automatic detection in Flink?


Best,
Weihua


On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:

> Hi team,
>
> I’d like to start a discussion about FLIP-316 [1], which introduces a SQL
> driver as the
> default main class for Flink SQL jobs.
>
> Currently, Flink SQL could be executed out of the box either via SQL
> Client/Gateway
> or embedded in a Flink Java/Python program.
>
> However, each one has its drawback:
>
> - SQL Client/Gateway doesn’t support the application deployment mode [2]
> - Flink Java/Python program requires extra work to write a non-SQL program
>
> Therefore, I propose adding a SQL driver to act as the default main class
> for SQL jobs.
> Please see the FLIP docs for details and feel free to comment. Thanks!
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> >
> [2] https://issues.apache.org/jira/browse/FLINK-26541 <
> https://issues.apache.org/jira/browse/FLINK-26541>
>
> Best,
> Paul Lam


Re: Job stuck in CREATED state with scheduling failures

2023-05-30 Thread Matthias Pohl
Hi Gyula,
Could you share the logs in the ML? Or is there a Jira issue I missed?

Matthias

On Wed, May 17, 2023 at 9:33 PM Gyula Fóra  wrote:

> Hey Devs!
>
> I am bumping this thread to see if someone has any ideas how to go about
> solving this.
>
> Yang Wang earlier had this comment but I am not sure how to proceed:
>
> "From the logs you have provided, I find a potential bug in the current
> leader retrieval. In DefaultLeaderRetrievalService , if the leader
> information does not change, we will not notify the listener. It is indeed
> correct in all-most scenarios and could save some following heavy
> operations. But in the current case, it might be the root cause. For TM1,
> we added 0002 for job leader monitoring at
> 2023-01-18 05:31:23,848. However, we never get the next expected log
> “Resolved JobManager address, beginning registration”. It just because the
> leader information does not change. So the TM1 got stuck at waiting for the
> leader and never registered to the JM. Finally, the job failed with no
> enough slots."
>
> I wonder if someone could maybe confirm the current behaviour.
>
> Thanks
> Gyula
>
> On Mon, Jan 23, 2023 at 4:06 PM Tamir Sagi 
> wrote:
>
>> Hey Gyula,
>>
>> We encountered similar issues recently . Our Flink stream application
>> clusters(v1.15.2) are running in AWS EKS.
>>
>>
>>1. TM gets disconnected sporadically and never returns.
>>
>> org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with
>> id aml-rule-eval-stream-taskmanager-1-1 is no longer reachable.
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1387)
>>
>> at
>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>>
>> heartbeat.timeout is set to 15 minutes.
>>
>>
>> There are some heartbeat updates on Flink web-UI
>>
>>
>> There are not enough logs about it and no indication of OOM whatsoever
>> within k8s. However, We increased the TMs' memory, and the issue seems to
>> be resolved for now. (yet, it might hide a bigger issue).
>>
>> The 2nd issue is regarding  'NoResourceAvailableException' with the
>> following error message
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout (Enclosed log files.)
>>
>> I also found this unresolved ticket [1] with suggestion by @Yang Wang
>>  which seems to be working so far.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25649
>>
>> Any thoughts?
>>
>> Thanks,
>> Tamir.
>>
>> --
>> *From:* Gyula Fóra 
>> *Sent:* Sunday, January 22, 2023 12:43 AM
>> *To:* user 
>> *Subject:* Job stuck in CREATED state with scheduling failures
>>
>>
>> *EXTERNAL EMAIL*
>>
>>
>> Hi Devs!
>>
>> We noticed a very strange failure scenario a few times recently with the
>> Native Kubernetes integration.
>>
>> The issue is triggered by a heartbeat timeout (a temporary network
>> problem). We observe the following behaviour:
>>
>> ===
>> 3 pods (1 JM, 2 TMs), Flink 1.15 (Kubernetes Native Integration):
>>
>> 1. Temporary network problem
>>  - Heartbeat failure, TM1 loses JM connection and JM loses TM1 connection.
>>  - Both the JM and TM1 trigger the job failure on their sides and cancel
>> the tasks
>>  - JM releases TM1 slots
>>
>> 2. While failing/cancelling the job, the network connection recovers and
>> TM1 reconnects to JM:
>> *TM1: Resolved JobManager address, beginning registration*
>>
>> 3. JM tries to resubmit the job using TM1 + TM2 but the scheduler keeps
>> failing as it cannot seem to allocate all the resources:
>>
>> *NoResourceAvailableException: Slot request bulk is not fulfillable!
>> Could not allocate the required slot within slot request timeout *
>> On TM1 we see the following logs repeating (mutliple times every few
>> seconds until the slot request times out after 5 minutes):
>> *Receive slot request ... for job ... from resource manager with leader
>> id ...*
>> *Allocated slot for ...*
>> *Receive slot request ... for job ... from resource manager with leader
>> id ...*
>> *Allocated slot for *
>> *Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>> ResourceProfile{...}, allocationId: ..., jobId: ...).*
>>
>> While all these are happening on TM1 we don't see any allocation related
>> INFO logs on TM2.
>> ===
>>
>> Seems like something weird happens when TM1 reconnects after the
>> heartbeat loss. I feel that the JM should probably shut down the TM and
>> create a new one. But instead it gets stuck.
>>
>> Any ideas what could be happening here?
>>
>> Thanks
>> Gyula
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Jing Ge
Hi Piotr,


> But why do we need to have two separate mechanisms, if the dynamic
> adjustment based on the backpressure/backlog would
> achieve basically the same goal as your proposal and would solve both of
> the problems? Having two independent solutions
> in the same codebase, in the docs, that are achieving basically the same
> thing is far from ideal. It would increase both the
> complexity of the system and confuse potential users.
>

Great question! Well, my answer is: there is no silver bullet. Therefore I
don't see any problem if there are more than one mechanism to solve the
same issue but from a different perspective. Even for solving the
checkpoint duration issue, we have Unaligned checkpoints and Buffer
debloating to address back pressure (again, same goal, two mechanisms), and
incremental checkpionting, and log-based incremental checkpoints.

If we take a look at the dynamic adjustment from a different perspective,
it could be considered as a tradeoff - we have to rely on the back pressure
metrics to do a pure technical performance improvement without knowing any
usage context, because we don't know the real requirement in advance. For
some use cases, it is the only choice, i.e. a tradeoff, but for other
cases, there are better ways based on the input information in advance.

As I mentioned previously, what if the user doesn't want to trigger too
many checkpoints(know info in advance), even if there is no back pressure
at all? What if the user doesn't want to change the checkpoint interval if
there are back pressures?  Should users have the control or should the
system just wildly adjust everything and ignore users' needs? BTW, if I am
not mistaken, your proposal is the third mechanism that will be mixed up
with the above mentioned Unaligned checkpoints and Buffer debloating. All
of them will affect checkpointing. Users will be confused.

Like I mentioned previously, data driven dynamic adjustments are a good
thing, we should have them. But if the statement is that dynamic adjustment
should be the one and only one mechanism, I think, we should reconsider it
again and carefully :-)

Best regards,
Jing

On Mon, May 29, 2023 at 5:23 PM Piotr Nowojski  wrote:

> Hi
>
> @Jing
>
> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
> It
> > makes sense to build it as a generic feature in Flink. Looking forward to
> > it. However, for some user cases, e.g. when users were aware of the
> bounded
> > sources (in the HybridSource) and care more about the throughput, the
> > dynamic adjustment might not be required. Just let those bounded sources
> > always have larger checkpoint intervals even when there is no back
> > pressure. Because no one cares about latency in this case, let's turn off
> > the dynamic adjustment, reduce the checkpoint frequency, have better
> > throughput, and save unnecessary source consumption. Did I miss anything
> > here?
>
>

> But why do we need to have two separate mechanisms, if the dynamic
> adjustment based on the backpressure/backlog would
> achieve basically the same goal as your proposal and would solve both of
> the problems? Having two independent solutions
> in the same codebase, in the docs, that are achieving basically the same
> thing is far from ideal. It would increase both the
> complexity of the system and confuse potential users.
>


>
> Moreover, as I have already mentioned before, I don't like the current
> proposal as it's focusing ONLY on the HybridSource,
> which can lead to even worse problem in the future, where many different
> sources would have each a completely custom
> solution to solve the same/similar problems, complicating the system and
> confusing the users even more.
>
> @Dong,
>
> > For now I am not able to come up with a good way to support this. I am
> happy to discuss the
> > pros/cons if you can provide more detail (e.g. API design) regarding how
> to support this approach
>
> I have already described such proposal:
>
> > Piotr:
> > I don't know, maybe instead of adding this logic to operator
> coordinators, `CheckpointCoordinator` should have a pluggable
> `CheckpointTrigger`,
> > that the user could configure like a `MetricReporter`. The default one
> would be just periodically triggering checkpoints. Maybe
> > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured
> > threshold and based on that adjust the checkpointing interval
> accordingly? This would at least address some of my concerns.
>
> plus
>
> > Piotr:
> >  Either way, I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> > slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:
> >
> > As a user, I would like to have my regular fast checkpointing interval
> for low latency, but the moment my system is not keeping up, if the
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi Dong,

First of all we don't need to send any extra signal from source (or non
source) operators. All of the operators are already reporting backpressured
metrics [1]
and all of the metrics are already sent to JobManager. We would only need
to pass some accessor to the metrics to the `CheckpointTrigger`.

> execution.checkpointing.interval.no-backpressure

Maybe that's the way to go, but as I mentioned before, I could see this
`CheckpointTrigger` to be a pluggable component, that could have been
configured
the same way as `MetricReporters` are right now [2]. We could just provide
out of the box two plugins, one implementing current checkpoint triggering
strategy,
and the other using backpressure.

> I think the root cause of this issue is that the decision of the
> checkpointing interval really depends on the expected impact of a
> checkpoint on the throughput.

Yes, I agree. Ideally we probably should adjust the checkpointing interval
based on measured latency, for example using latency markers [3], but that
would
require some investigation if latency markers are indeed that costly as
documented and if so optimizing them to solve the performance degradation
of enabling
e2e latency tracking.

However, given that the new back pressure monitoring strategy would be
optional AND users could implement their own `CheckpointTrigger` if really
needed
AND I have a feeling that there might be an even better solution (more
about that later).

> if the checkpointing overhead is
> close to none, then it is beneficial to the e2e latency to still
checkpoint
> a high frequency even if there exists (intermittent) backpressure.

In that case users could just configure a slow checkpointing interval to a
lower value, or just use static checkpoint interval strategy.

> With the suggested approach, the e2e latency introduced by Flink is
roughly
> 72 seconds. This is because it takes 1 minute for 11MBps phase to end, and
> another 12 seconds for the accumulated backlog to be cleared. And Flink
can
> not do checkpoint before the backlog is cleared.

Indeed that's a valid concern. After thinking more about this issue, maybe
the proper solution would be to calculate "how much overloaded is the most
overloaded subtask".
In this case, that would be 10% (we are trying to push 110% of the
available capacity in the current job/cluster). Then we could use that
number as some kind of weighted average.
We could figure out a function mapping the overload percentage, into a
floating point number from range [0, 1]

f(overload_factor) = weight // weight is from [0, 1]

and then the desired checkpoint interval would be something like

(1 - weight) * fastCheckpointInterval + weight * slowCheckpointInterval

In your problematic example, we would like the weight to be pretty small
(<10%?), so the calculated checkpoint interval would be pretty close to the
fastCheckpointInterval.

The overload factor we could calculate the same way as FLIP-271 is
calculating how much should we rescale given operator [4].

I can think about this more and elaborate/refine this idea tomorrow.

Best,
Piotrek


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling

wt., 30 maj 2023 o 13:58 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thank you for providing those details.
>
> I understand you suggested using the existing "isBackPressured" signal to
> determine whether we should use the less frequent checkpointing interval. I
> followed your thoughts and tried to make it work. Below are the issues that
> I am not able to address. Can you see if there is a way to address these
> issues?
>
> Let's will use the following use-case to make the discussion more concrete:
> a) Users want to checkpoint at least once every 30 minutes to upper-bound
> the amount of duplicate work after job failover.
> b) Users want to checkpoint at least once every 30 seconds to
> upper-bound *extra
> e2e lag introduced by the Flink job* during the continuous processing
> phase.
>
> The suggested approach is designed to do this:
> - If any of the source subtasks is backpressured, the job will checkpoint
> at 30-minutes interval.
> - If none of the source subtasks is backpressured, the job will checkpoint
> at 30-seconds interval.
>
> And we would need to add the following public APIs to implement this
> approach:
> - Add a job level config, maybe
> execution.checkpointing.interval.no-backpressure. This is the checkpointing
> interval when none of the source subtasks is backpressured.
> - Add a public API for source operator subtasks to report their
> backpressure status to the checkpointing coordinator. The subtask should
> invoke this API whenever its backpressure status changed.
>
> Now, in order to mak

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-30 Thread Tzu-Li (Gordon) Tai
Hi,

> I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

This should work, yes.

+1 to the updated FLIP so far. Thank you, Joao, for being on top of this!

Thanks,
Gordon

On Tue, May 30, 2023 at 12:34 AM João Boto  wrote:

> Hi Lijie,
>
> I left the two options to use whatever you want, but I can clean the FLIP
> to have only one..
>
> Updated the FLIP
>
> Regards
>
> On 2023/05/23 07:23:45 Lijie Wang wrote:
> > Hi Joao,
> >
> > I noticed the FLIP currently contains the following 2 methods about type
> > serializer:
> >
> > (1)  TypeSerializer createInputSerializer();
> > (2)  TypeSerializer createSerializer(TypeInformation inType);
> >
> > Is the method (2) still needed now?
> >
> > Best,
> > Lijie
> >
> > João Boto  于2023年5月19日周五 16:53写道:
> >
> > > Updated the FLIP to use this option.
> > >
> >
>


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-30 Thread Shammon FY
Thanks Feng, the catalog modification listener is only used to report
read-only ddl information to other components or systems.

> 1. Will an exception thrown by the listener affect the normal execution
process?

Users need to handle the exception in the listener themselves. Many DDLs
such as drop tables and alter tables cannot be rolled back, Flink cannot
handle these exceptions for the listener. It will cause the operation to
exit if an exception is thrown, but the executed DDL will be successful.

> 2. What is the order of execution? Is the listener executed first or are
specific operations executed first?  If I want to perform DDL permission
verification(such as integrating with Ranger based on the listener) , is
that possible?

The listener will be notified to report catalog modification after DDLs are
successful, so you can not do permission verification for DDL in the
listener. As mentioned above, Flink will not roll back the DDL even when
the listener throws an exception. I think permission verification is
another issue and can be discussed separately.


Best,
Shammon FY

On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:

> Hi, Shammon
>
> Thanks for driving this Flip, [Support Customized Job Meta Data Listener]
> will  make it easier for Flink to collect lineage information.
> I fully agree with the overall solution and have a small question:
>
> 1. Will an exception thrown by the listener affect the normal execution
> process?
>
> 2. What is the order of execution? Is the listener executed first or are
> specific operations executed first?  If I want to perform DDL permission
> verification(such as integrating with Ranger based on the listener) , is
> that possible?
>
>
> Best,
> Feng
>
> On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
>
> > Hi devs,
> >
> > We would like to bring up a discussion about FLIP-294: Support Customized
> > Job Meta Data Listener[1]. We have had several discussions with Jark Wu,
> > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions and
> > interfaces, and thanks for their valuable advice.
> > The overall job and connector information is divided into metadata and
> > lineage, this FLIP focuses on metadata and lineage will be discussed in
> > another FLIP in the future. In this FLIP we want to add a customized
> > listener in Flink to report catalog modifications to external metadata
> > systems such as datahub[2] or atlas[3]. Users can view the specific
> > information of connectors such as source and sink for Flink jobs in these
> > systems, including fields, watermarks, partitions, etc.
> >
> > Looking forward to hearing from you, thanks.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > [2] https://datahub.io/
> > [3] https://atlas.apache.org/#/
> >
>


Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-30 Thread liu ron
Thanks for your proposal. I even don't notice this fallback behavior, +1.

Best,
Ron

Jingsong Li  于2023年5月30日周二 15:23写道:

> +1, the fallback looks weird now, it is outdated.
>
> But, it is good to provide an option. I don't know if there are some
> users who depend on this fallback.
>
> Best,
> Jingsong
>
> On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
> >
> > +1, the fallback was just intended as a temporary workaround to run
> catalog/module related statements with hive dialect.
> >
> > On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
> >>
> >> Big +1 on this, thanks yuxia for driving this!
> >>
> >> yuxia  于2023年5月29日周一 14:55写道:
> >>
> >> > Hi, community.
> >> >
> >> > I want to start the discussion about Hive dialect shouldn't fall back
> to
> >> > Flink's default dialect.
> >> >
> >> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
> >> > it'll fall back to Flink's default parser[1] to handle flink-specific
> >> > statements like "CREATE CATALOG xx with (xx);".
> >> >
> >> > As I‘m involving with Hive dialect and have some communication with
> >> > community users who use Hive dialectrecently,  I'm thinking throw
> exception
> >> > directly instead of falling back to Flink's default dialect when fail
> to
> >> > parse the sql in Hive dialect
> >> >
> >> > Here're some reasons:
> >> >
> >> > First of all, it'll hide some error with Hive dialect. For example, we
> >> > found we can't use Hive dialect any more with Flink sql client in
> release
> >> > validation phase[2], finally we find a modification in Flink sql
> client
> >> > cause it, but our test case can't find it earlier for although
> HiveParser
> >> > faill to parse it but then it'll fall back to default parser and pass
> test
> >> > case successfully.
> >> >
> >> > Second, conceptually, Hive dialect should be do nothing with Flink's
> >> > default dialect. They are two totally different dialect. If we do
> need a
> >> > dialect mixing Hive dialect and default dialect , may be we need to
> propose
> >> > a new hybrid dialect and announce the hybrid behavior to users.
> >> > Also, It made some users confused for the fallback behavior. The fact
> >> > comes from I had been ask by community users. Throw an excpetioin
> directly
> >> > when fail to parse the sql statement in Hive dialect will be more
> intuitive.
> >> >
> >> > Last but not least, it's import to decouple Hive with Flink planner[3]
> >> > before we can externalize Hive connector[4]. If we still fall back to
> Flink
> >> > default dialct, then we will need depend on `ParserImpl` in Flink
> planner,
> >> > which will block us removing the provided dependency of Hive dialect
> as
> >> > well as externalizing Hive connector.
> >> >
> >> > Although we hadn't announced the fall back behavior ever, but some
> users
> >> > may implicitly depend on this behavior in theirs sql jobs. So, I
> hereby
> >> > open the dicussion about abandoning the fall back behavior to make
> Hive
> >> > dialect clear and isoloted.
> >> > Please remember it won't break the Hive synatax but the syntax
> specified
> >> > to Flink may fail after then. But for the failed sql, you can use `SET
> >> > table.sql-dialect=default;` to switch to Flink dialect.
> >> > If there's some flink-specific statements we found should be included
> in
> >> > Hive dialect to be easy to use, I think we can still add them as
> specific
> >> > cases to Hive dialect.
> >> >
> >> > Look forwards to your feedback. I'd love to listen the feedback from
> >> > community to take the next steps.
> >> >
> >> > [1]:
> >> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
> >> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
> >> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
> >> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
> >> >
> >> >
> >> >
> >> > Best regards,
> >> > Yuxia
> >> >
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >
> >
> >
> > --
> > Best regards!
> > Rui Li
>


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-05-30 Thread liu ron
Hi, Samrat

Thanks for driving this FLIP. It looks like supporting
flink-connector-redshift is very useful to Flink. I have two question:
1. Regarding the  `read.mode` and `write.mode`, you say here provides two
modes, respectively, jdbc and `unload or copy`, What is the default value
for `read.mode` and `write.mode?
2. For Source, does it both support batch read and streaming read?


Best,
Ron

Samrat Deb  于2023年5月30日周二 17:15写道:

> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
>
> [note] Missed the trailing link for previous mail
>
>
>
> On Tue, May 30, 2023 at 2:43 PM Samrat Deb  wrote:
>
> > Hi Leonard,
> >
> > > and I’m glad to help review the design as well as the code review.
> > Thank you so much. It would be really great and helpful to bring
> > flink-connector-redshift for flink users :) .
> >
> > I have divided the implementation in 3 phases in the `Scope` Section[1].
> > 1st phase is to
> >
> >- Integrate with Flink Sink API (*FLIP-171*
> ><
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink>
> >)
> >
> >
> > > About the implementation phases, How about prioritizing support for the
> > Datastream Sink API and TableSink API in the first phase?
> > I can completely resonate with you to prioritize support for Datastream
> > Sink API and TableSink API in the first phase.
> > I will update the FLIP[1] as you have suggested.
> >
> > > It seems that the primary use cases for the Redshift connector are
> > acting as a sink for processed data by Flink.
> > Yes, majority ask and requirement for Redshift connector is sink for
> > processed data by Flink.
> >
> > Bests,
> > Samrat
> >
> > On Tue, May 30, 2023 at 12:35 PM Leonard Xu  wrote:
> >
> >> Thanks @Samrat for bringing this discussion.
> >>
> >> It makes sense to me to introduce AWS Redshift connector for Apache
> >> Flink, and I’m glad to help review the design as well as the code
> review.
> >>
> >> About the implementation phases, How about prioritizing support for the
> >> Datastream Sink API and TableSink API in the first phase? It seems that
> the
> >> primary use cases for the Redshift connector are acting as a sink for
> >> processed data by Flink.
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >> > On May 29, 2023, at 12:51 PM, Samrat Deb 
> wrote:
> >> >
> >> > Hello all ,
> >> >
> >> > Context:
> >> > Amazon Redshift [1] is a fully managed, petabyte-scale data warehouse
> >> > service in the cloud. It allows analyzing data without all of the
> >> > configurations of a provisioned data warehouse. Resources are
> >> automatically
> >> > provisioned and data warehouse capacity is intelligently scaled to
> >> deliver
> >> > fast performance for even the most demanding and unpredictable
> >> workloads.
> >> > Redshift is one of the widely used warehouse solutions in the current
> >> > market.
> >> >
> >> > Building flink connector redshift will allow flink users to have
> source
> >> and
> >> > sink directly to redshift. It will help flink to expand the scope to
> >> > redshift as a new connector in the ecosystem.
> >> >
> >> > I would like to start a discussion on the FLIP-307: Flink connector
> >> > redshift [2].
> >> > Looking forward to comments, feedbacks and suggestions from the
> >> community
> >> > on the proposal.
> >> >
> >> > [1] https://docs.aws.amazon.com/redshift/latest/mgmt/welcome.html
> >> > [2]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> >> >
> >> >
> >> >
> >> > Bests,
> >> > Samrat
> >>
> >>
>


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Shammon FY
Thanks Paul for driving this proposal.

I found the sql driver has no config related options. If I understand
correctly, the sql driver can be used to submit sql jobs in a 'job
submission service' such as sql-gateway. In general, in addition to the
default config for Flink cluster which includes k8s, ha and .etc, users may
also specify configurations for SQL jobs, including parallelism, number of
Task Managers, etc.

For example, in sql-gateway users can `set` dynamic parameters before
submitting a sql statement, and for sql files users may put their
configurations in a `yaml` file. Should sql driver support dynamic
parameters and specify config file?

Best,
Shammon FY

On Tue, May 30, 2023 at 9:57 PM Weihua Hu  wrote:

> Thanks Paul for the proposal.
>
> +1 for this. It is valuable in improving ease of use.
>
> I have a few questions.
> - Is SQLRunner the better name? We use this to run a SQL Job. (Not strong,
> the SQLDriver is fine for me)
> - Could we run SQL jobs using SQL in strings? Otherwise, we need to prepare
> a SQL file in an image for Kubernetes application mode, which may be a bit
> cumbersome.
> - I noticed that we don't specify the SQLDriver jar in the
> "run-application"
> command. Does that mean we need to perform automatic detection in Flink?
>
>
> Best,
> Weihua
>
>
> On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:
>
> > Hi team,
> >
> > I’d like to start a discussion about FLIP-316 [1], which introduces a SQL
> > driver as the
> > default main class for Flink SQL jobs.
> >
> > Currently, Flink SQL could be executed out of the box either via SQL
> > Client/Gateway
> > or embedded in a Flink Java/Python program.
> >
> > However, each one has its drawback:
> >
> > - SQL Client/Gateway doesn’t support the application deployment mode [2]
> > - Flink Java/Python program requires extra work to write a non-SQL
> program
> >
> > Therefore, I propose adding a SQL driver to act as the default main class
> > for SQL jobs.
> > Please see the FLIP docs for details and feel free to comment. Thanks!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> > >
> > [2] https://issues.apache.org/jira/browse/FLINK-26541 <
> > https://issues.apache.org/jira/browse/FLINK-26541>
> >
> > Best,
> > Paul Lam
>


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Shengkai Fang
Thanks for the proposal. The Application mode is very important to Flink
SQL. But I have some questions about the FLIP:

1. The FLIP does not specify the kind of SQL that will be submitted with
the application mode. I believe only a portion of the SQL will be delegated
to the SqlRunner.
2. Will the SQL Client/Gateway perform any validation before submitting the
SQL to the SqlRunner? If the user's SQL is invalid, it could take a long
time to fetch error messages before execution.
3. ExecNodeGraph VS SQL File

Initially, we planned to use ExecNodeGraph as the essential information to
be submitted. By enabling 'table.plan.compile.catalog-objects' = 'all', the
ExecNodeGraph provides necessary information about the session state.
ExecNodeGraph contains the basic structure of tables and the class name of
catalog functions used, which enables us to avoid serializing the catalog
to the remote. Therefore, we prefer to use ExecGraph as the content
submitted. Furthermore, our internal implementation can extend ExecGraph
beyond state compatibility.

4. SqlRunner VS ApplicationRunner

In the FLIP-85, it mentions to support Library mode. Compared to adding a
new module, I think it's better to extend the origin design? WDYT?

ApplicationRunner.run((StreamExecutionEnvironment env) -> {

  … // a consumer of the env

})

5. K8S Application mode

As far as I know, K8S doesn't support shipping multiple jars to the remote.
It seems the current design also doesn't support K8S Application mode?

https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L217

6. Could you add more details about your modification in the gateway side,
including error handling, execution workflow, and the impact on job
statements?

Best,
Shengkai

Shammon FY  于2023年5月31日周三 10:40写道:

> Thanks Paul for driving this proposal.
>
> I found the sql driver has no config related options. If I understand
> correctly, the sql driver can be used to submit sql jobs in a 'job
> submission service' such as sql-gateway. In general, in addition to the
> default config for Flink cluster which includes k8s, ha and .etc, users may
> also specify configurations for SQL jobs, including parallelism, number of
> Task Managers, etc.
>
> For example, in sql-gateway users can `set` dynamic parameters before
> submitting a sql statement, and for sql files users may put their
> configurations in a `yaml` file. Should sql driver support dynamic
> parameters and specify config file?
>
> Best,
> Shammon FY
>
> On Tue, May 30, 2023 at 9:57 PM Weihua Hu  wrote:
>
> > Thanks Paul for the proposal.
> >
> > +1 for this. It is valuable in improving ease of use.
> >
> > I have a few questions.
> > - Is SQLRunner the better name? We use this to run a SQL Job. (Not
> strong,
> > the SQLDriver is fine for me)
> > - Could we run SQL jobs using SQL in strings? Otherwise, we need to
> prepare
> > a SQL file in an image for Kubernetes application mode, which may be a
> bit
> > cumbersome.
> > - I noticed that we don't specify the SQLDriver jar in the
> > "run-application"
> > command. Does that mean we need to perform automatic detection in Flink?
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:
> >
> > > Hi team,
> > >
> > > I’d like to start a discussion about FLIP-316 [1], which introduces a
> SQL
> > > driver as the
> > > default main class for Flink SQL jobs.
> > >
> > > Currently, Flink SQL could be executed out of the box either via SQL
> > > Client/Gateway
> > > or embedded in a Flink Java/Python program.
> > >
> > > However, each one has its drawback:
> > >
> > > - SQL Client/Gateway doesn’t support the application deployment mode
> [2]
> > > - Flink Java/Python program requires extra work to write a non-SQL
> > program
> > >
> > > Therefore, I propose adding a SQL driver to act as the default main
> class
> > > for SQL jobs.
> > > Please see the FLIP docs for details and feel free to comment. Thanks!
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> > > >
> > > [2] https://issues.apache.org/jira/browse/FLINK-26541 <
> > > https://issues.apache.org/jira/browse/FLINK-26541>
> > >
> > > Best,
> > > Paul Lam
> >
>


[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-32219:


 Summary: sql client would be pending after executing plan of 
inserting
 Key: FLINK-32219
 URL: https://issues.apache.org/jira/browse/FLINK-32219
 Project: Flink
  Issue Type: Bug
Reporter: Shuai Xu


I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-30 Thread liu ron
Hi, Shammon

Thanks for driving this FLIP, It will enforce the Flink metadata capability
from the platform produce perspective. The overall design looks good to me,
I just have some small question:
1. Regarding CatalogModificationListenerFactory#createListener method, I
think it would be better to pass Context as its parameter instead of two
specific Object. In this way, we can easily extend it in the future and
there will be no compatibility problems. Refer to
https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
2. In FLIP, you mentioned that multiple Flink tables may refer to the same
physical table, so does the Listener report this physical table repeatedly?
3. When registering a Listener object, will it connect to an external
system such as Datahub? If the Listener object registration times out due
to permission issues, it will affect the execution of all subsequent SQL,
what should we do in this case?

Best,
Ron

Shammon FY  于2023年5月31日周三 08:53写道:

> Thanks Feng, the catalog modification listener is only used to report
> read-only ddl information to other components or systems.
>
> > 1. Will an exception thrown by the listener affect the normal execution
> process?
>
> Users need to handle the exception in the listener themselves. Many DDLs
> such as drop tables and alter tables cannot be rolled back, Flink cannot
> handle these exceptions for the listener. It will cause the operation to
> exit if an exception is thrown, but the executed DDL will be successful.
>
> > 2. What is the order of execution? Is the listener executed first or are
> specific operations executed first?  If I want to perform DDL permission
> verification(such as integrating with Ranger based on the listener) , is
> that possible?
>
> The listener will be notified to report catalog modification after DDLs are
> successful, so you can not do permission verification for DDL in the
> listener. As mentioned above, Flink will not roll back the DDL even when
> the listener throws an exception. I think permission verification is
> another issue and can be discussed separately.
>
>
> Best,
> Shammon FY
>
> On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
>
> > Hi, Shammon
> >
> > Thanks for driving this Flip, [Support Customized Job Meta Data Listener]
> > will  make it easier for Flink to collect lineage information.
> > I fully agree with the overall solution and have a small question:
> >
> > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > 2. What is the order of execution? Is the listener executed first or are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> >
> > Best,
> > Feng
> >
> > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> >
> > > Hi devs,
> > >
> > > We would like to bring up a discussion about FLIP-294: Support
> Customized
> > > Job Meta Data Listener[1]. We have had several discussions with Jark
> Wu,
> > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> and
> > > interfaces, and thanks for their valuable advice.
> > > The overall job and connector information is divided into metadata and
> > > lineage, this FLIP focuses on metadata and lineage will be discussed in
> > > another FLIP in the future. In this FLIP we want to add a customized
> > > listener in Flink to report catalog modifications to external metadata
> > > systems such as datahub[2] or atlas[3]. Users can view the specific
> > > information of connectors such as source and sink for Flink jobs in
> these
> > > systems, including fields, watermarks, partitions, etc.
> > >
> > > Looking forward to hearing from you, thanks.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > [2] https://datahub.io/
> > > [3] https://atlas.apache.org/#/
> > >
> >
>


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Paul Lam
Hi Weihua,

Thanks a lot for your input! Please see my comments inline.

> - Is SQLRunner the better name? We use this to run a SQL Job. (Not strong,
> the SQLDriver is fine for me)

I’ve thought about SQL Runner but picked SQL Driver for the following reasons 
FYI:

1. I have a PythonDriver doing the same job for PyFlink [1]
2. Flink program's main class is sort of like Driver in JDBC which translates 
SQLs into
databases specific languages.

In general, I’m +1 for SQL Driver and +0 for SQL Runner.

> - Could we run SQL jobs using SQL in strings? Otherwise, we need to prepare
> a SQL file in an image for Kubernetes application mode, which may be a bit
> cumbersome.

Do you mean a pass the SQL string a configuration or a program argument? 

I thought it might be convenient for testing propose, but not recommended for 
production,
cause Flink SQLs could be complicated and involves lots of characters that need 
to escape.

WDYT?

> - I noticed that we don't specify the SQLDriver jar in the "run-application"
> command. Does that mean we need to perform automatic detection in Flink?

Yes! It’s like running a PyFlink job with the following command:

```
./bin/flink run \
  --pyModule table.word_count \
  --pyFiles examples/python/table
```

The CLI determines if it’s a SQL job, if yes apply the SQL Driver automatically.


[1] 
https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java

Best,
Paul Lam

> 2023年5月30日 21:56,Weihua Hu  写道:
> 
> Thanks Paul for the proposal.
> 
> +1 for this. It is valuable in improving ease of use.
> 
> I have a few questions.
> - Is SQLRunner the better name? We use this to run a SQL Job. (Not strong,
> the SQLDriver is fine for me)
> - Could we run SQL jobs using SQL in strings? Otherwise, we need to prepare
> a SQL file in an image for Kubernetes application mode, which may be a bit
> cumbersome.
> - I noticed that we don't specify the SQLDriver jar in the "run-application"
> command. Does that mean we need to perform automatic detection in Flink?
> 
> 
> Best,
> Weihua
> 
> 
> On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:
> 
>> Hi team,
>> 
>> I’d like to start a discussion about FLIP-316 [1], which introduces a SQL
>> driver as the
>> default main class for Flink SQL jobs.
>> 
>> Currently, Flink SQL could be executed out of the box either via SQL
>> Client/Gateway
>> or embedded in a Flink Java/Python program.
>> 
>> However, each one has its drawback:
>> 
>> - SQL Client/Gateway doesn’t support the application deployment mode [2]
>> - Flink Java/Python program requires extra work to write a non-SQL program
>> 
>> Therefore, I propose adding a SQL driver to act as the default main class
>> for SQL jobs.
>> Please see the FLIP docs for details and feel free to comment. Thanks!
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
>>> 
>> [2] https://issues.apache.org/jira/browse/FLINK-26541 <
>> https://issues.apache.org/jira/browse/FLINK-26541>
>> 
>> Best,
>> Paul Lam



Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-30 Thread liu ron
Hi, Feng

Thanks for driving this FLIP, Time travel is very useful for Flink
integrate with data lake system. I have one question why the implementation
of TimeTravel is delegated to Catalog? Assuming that we use Flink to query
Hudi table with the time travel syntax, but we don't use the HudiCatalog,
instead, we register the hudi table to InMemoryCatalog,  can we support
time travel for Hudi table in this case?
In contrast, I think time travel should bind to connector instead of
Catalog, so the rejected alternative should be considered.

Best,
Ron

yuxia  于2023年5月30日周二 09:40写道:

> Hi, Feng.
> Notice this FLIP only support batch mode for time travel.  Would it also
> make sense to support stream mode to a read a snapshot of the table as a
> bounded stream?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Benchao Li" 
> 收件人: "dev" 
> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
>
> # Can Calcite support this syntax ` VERSION AS OF`  ?
>
> This also depends on whether this is defined in standard or any known
> databases that have implemented this. If not, it would be hard to push it
> to Calcite.
>
> # getTable(ObjectPath object, long timestamp)
>
> Then we again come to the problem of "casting between timestamp and
> numeric", which has been disabled in FLINK-21978[1]. If you're gonna use
> this, then we need to clarify that problem first.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21978
>
>
> Feng Jin  于2023年5月29日周一 15:57写道:
>
> > hi, thanks for your reply.
> >
> > @Benchao
> > > did you consider the pushdown abilities compatible
> >
> > In the current design, the implementation of TimeTravel is delegated to
> > Catalog. We have added a function called getTable(ObjectPath tablePath,
> > long timestamp) to obtain the corresponding CatalogBaseTable at a
> specific
> > time.  Therefore, I think it will not have any impact on the original
> > pushdown abilities.
> >
> >
> > >   I see there is a rejected  design for adding SupportsTimeTravel, but
> I
> > didn't see the alternative in  the FLIP doc
> >
> > Sorry, the document description is not very clear.  Regarding whether to
> > support SupportTimeTravel, I have discussed it with yuxia. Since we have
> > already passed the corresponding time in getTable(ObjectPath, long
> > timestamp) of Catalog, SupportTimeTravel may not be necessary.
> >
> > In getTable(ObjectPath object, long timestamp), we can obtain the schema
> of
> > the corresponding time point and put the SNAPSHOT that needs to be
> consumed
> > into options.
> >
> >
> > @Shammon
> > > Could we support this in Flink too?
> >
> > I personally think it's possible, but limited by Calcite's syntax
> > restrictions. I believe we should first support this syntax in Calcite.
> > Currently, I think it may not be easy  to support this syntax in Flink's
> > parser. @Benchao, what do you think? Can Calcite support this syntax
> > ` VERSION AS OF`  ?
> >
> >
> > Best,
> > Feng.
> >
> >
> > On Fri, May 26, 2023 at 2:55 PM Shammon FY  wrote:
> >
> > > Thanks Feng, the feature of time travel sounds great!
> > >
> > > In addition to SYSTEM_TIME, lake houses such as paimon and iceberg
> > support
> > > snapshot or version. For example, users can query snapshot 1 for paimon
> > by
> > > the following statement
> > > SELECT * FROM t VERSION AS OF 1
> > >
> > > Could we support this in Flink too?
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Fri, May 26, 2023 at 1:20 PM Benchao Li 
> wrote:
> > >
> > > > Regarding the implementation, did you consider the pushdown abilities
> > > > compatible, e.g., projection pushdown, filter pushdown, partition
> > > pushdown.
> > > > Since `Snapshot` is not handled much in existing rules, I have a
> > concern
> > > > about this. Of course, it depends on your implementation detail, what
> > is
> > > > important is that we'd better add some cross tests for these.
> > > >
> > > > Regarding the interface exposed to Connector, I see there is a
> rejected
> > > > design for adding SupportsTimeTravel, but I didn't see the
> alternative
> > in
> > > > the FLIP doc. IMO, this is an important thing we need to clarify
> > because
> > > we
> > > > need to know whether the Connector supports this, and what
> > > column/metadata
> > > > corresponds to 'system_time'.
> > > >
> > > > Feng Jin  于2023年5月25日周四 22:50写道:
> > > >
> > > > > Thanks for your reply
> > > > >
> > > > > @Timo @BenChao @yuxia
> > > > >
> > > > > Sorry for the mistake,  Currently , calcite only supports  `FOR
> > > > SYSTEM_TIME
> > > > > AS OF `  syntax.  We can only support `FOR SYSTEM_TIME AS OF` .
> I've
> > > > > updated the syntax part of the FLIP.
> > > > >
> > > > >
> > > > > @Timo
> > > > >
> > > > > > We will convert it to TIMESTAMP_LTZ?
> > > > >
> > > > > Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ and then
> > > > convert
> > > > > it into a long value.
> > > > >
> > > > > > How do we want to query the most recent version of a tabl

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Paul Lam
Hi Shammon,

Thanks a lot for your input!

I thought SQL Driver could act as a general-purpose default main class
for Flink SQL. It could be used in Flink CLI submission, web submission,
or SQL Client/Gateway submission. For SQL Client/Gateway submission,
we use it implicitly if needed, and for the other approaches, explicitly.

WRT to configurations, dynamic configurations are supported for most
cases ATM.

- For CLI submission, the dynamic configuration is supported via CLI, 
i.e. `-D` prefixed configurations. 
- For SQL Client/Gateway submission, we could also use `-D` dynamic 
configuration and SQL init file to customize the default session configuration.
- For web submission, there’s no dynamic configuration as I’m aware of.

Do you think it’s sufficient already or is it still necessary to implement it at
the SQL driver level?

Best,
Paul Lam

> 2023年5月31日 10:40,Shammon FY  写道:
> 
> Thanks Paul for driving this proposal.
> 
> I found the sql driver has no config related options. If I understand
> correctly, the sql driver can be used to submit sql jobs in a 'job
> submission service' such as sql-gateway. In general, in addition to the
> default config for Flink cluster which includes k8s, ha and .etc, users may
> also specify configurations for SQL jobs, including parallelism, number of
> Task Managers, etc.
> 
> For example, in sql-gateway users can `set` dynamic parameters before
> submitting a sql statement, and for sql files users may put their
> configurations in a `yaml` file. Should sql driver support dynamic
> parameters and specify config file?
> 
> Best,
> Shammon FY
> 
> On Tue, May 30, 2023 at 9:57 PM Weihua Hu  wrote:
> 
>> Thanks Paul for the proposal.
>> 
>> +1 for this. It is valuable in improving ease of use.
>> 
>> I have a few questions.
>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not strong,
>> the SQLDriver is fine for me)
>> - Could we run SQL jobs using SQL in strings? Otherwise, we need to prepare
>> a SQL file in an image for Kubernetes application mode, which may be a bit
>> cumbersome.
>> - I noticed that we don't specify the SQLDriver jar in the
>> "run-application"
>> command. Does that mean we need to perform automatic detection in Flink?
>> 
>> 
>> Best,
>> Weihua
>> 
>> 
>> On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:
>> 
>>> Hi team,
>>> 
>>> I’d like to start a discussion about FLIP-316 [1], which introduces a SQL
>>> driver as the
>>> default main class for Flink SQL jobs.
>>> 
>>> Currently, Flink SQL could be executed out of the box either via SQL
>>> Client/Gateway
>>> or embedded in a Flink Java/Python program.
>>> 
>>> However, each one has its drawback:
>>> 
>>> - SQL Client/Gateway doesn’t support the application deployment mode [2]
>>> - Flink Java/Python program requires extra work to write a non-SQL
>> program
>>> 
>>> Therefore, I propose adding a SQL driver to act as the default main class
>>> for SQL jobs.
>>> Please see the FLIP docs for details and feel free to comment. Thanks!
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
>>> <
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
 
>>> [2] https://issues.apache.org/jira/browse/FLINK-26541 <
>>> https://issues.apache.org/jira/browse/FLINK-26541>
>>> 
>>> Best,
>>> Paul Lam
>> 



Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Biao Geng
Thanks Paul for the proposal!I believe it would be very useful for flink
users.
After reading the FLIP, I have some questions:
1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs in
Application mode? More specifically, if we use SQL client/gateway to
execute some interactive SQLs like a SELECT query, can we ask flink to use
Application mode to execute those queries after this FLIP?
2. Deployment: I believe in YARN mode, the implementation is trivial as we
can ship files via YARN's tool easily but for K8s, things can be more
complicated as Shengkai said. I have implemented a simple POC

based on SQL client before(i.e. consider the SQL client which supports
executing a SQL file as the SQL driver in this FLIP). One problem I have
met is how do we ship SQL files ( or Job Graph) to the k8s side. Without
such support, users have to modify the initContainer or rebuild a new K8s
image every time to fetch the SQL file. Like the flink k8s operator, one
workaround is to utilize the flink config(transforming the SQL file to a
escaped string like Weihua mentioned) which will be converted to a
ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB
). Not sure
if we have better solutions.
3. Serialization of SessionState: in SessionState, there are some
unserializable fields
like org.apache.flink.table.resource.ResourceManager#userClassLoader. It
may be worthwhile to add more details about the serialization part.

Best,
Biao Geng

Paul Lam  于2023年5月31日周三 11:49写道:

> Hi Weihua,
>
> Thanks a lot for your input! Please see my comments inline.
>
> > - Is SQLRunner the better name? We use this to run a SQL Job. (Not
> strong,
> > the SQLDriver is fine for me)
>
> I’ve thought about SQL Runner but picked SQL Driver for the following
> reasons FYI:
>
> 1. I have a PythonDriver doing the same job for PyFlink [1]
> 2. Flink program's main class is sort of like Driver in JDBC which
> translates SQLs into
> databases specific languages.
>
> In general, I’m +1 for SQL Driver and +0 for SQL Runner.
>
> > - Could we run SQL jobs using SQL in strings? Otherwise, we need to
> prepare
> > a SQL file in an image for Kubernetes application mode, which may be a
> bit
> > cumbersome.
>
> Do you mean a pass the SQL string a configuration or a program argument?
>
> I thought it might be convenient for testing propose, but not recommended
> for production,
> cause Flink SQLs could be complicated and involves lots of characters that
> need to escape.
>
> WDYT?
>
> > - I noticed that we don't specify the SQLDriver jar in the
> "run-application"
> > command. Does that mean we need to perform automatic detection in Flink?
>
> Yes! It’s like running a PyFlink job with the following command:
>
> ```
> ./bin/flink run \
>   --pyModule table.word_count \
>   --pyFiles examples/python/table
> ```
>
> The CLI determines if it’s a SQL job, if yes apply the SQL Driver
> automatically.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
>
> Best,
> Paul Lam
>
> > 2023年5月30日 21:56,Weihua Hu  写道:
> >
> > Thanks Paul for the proposal.
> >
> > +1 for this. It is valuable in improving ease of use.
> >
> > I have a few questions.
> > - Is SQLRunner the better name? We use this to run a SQL Job. (Not
> strong,
> > the SQLDriver is fine for me)
> > - Could we run SQL jobs using SQL in strings? Otherwise, we need to
> prepare
> > a SQL file in an image for Kubernetes application mode, which may be a
> bit
> > cumbersome.
> > - I noticed that we don't specify the SQLDriver jar in the
> "run-application"
> > command. Does that mean we need to perform automatic detection in Flink?
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, May 29, 2023 at 7:24 PM Paul Lam  wrote:
> >
> >> Hi team,
> >>
> >> I’d like to start a discussion about FLIP-316 [1], which introduces a
> SQL
> >> driver as the
> >> default main class for Flink SQL jobs.
> >>
> >> Currently, Flink SQL could be executed out of the box either via SQL
> >> Client/Gateway
> >> or embedded in a Flink Java/Python program.
> >>
> >> However, each one has its drawback:
> >>
> >> - SQL Client/Gateway doesn’t support the application deployment mode [2]
> >> - Flink Java/Python program requires extra work to write a non-SQL
> program
> >>
> >> Therefore, I propose adding a SQL driver to act as the default main
> class
> >> for SQL jobs.
> >> Please see the FLIP docs for details and feel free to comment. Thanks!
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> >>>
> >> [2] https://issues.apache.org/jira/browse/FLINK-26541 <
> >> https://issues.apache.org/jira/browse/FLINK-26541>
> >>
> >> Best,
> >> Paul La

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-30 Thread Paul Lam
Sorry for the typo. I mean “We already have a PythonDriver doing the same job 
for PyFlink."

Best,
Paul Lam

> 2023年5月31日 11:49,Paul Lam  写道:
> 
> 1. I have a PythonDriver doing the same job for PyFlink [1]



Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-05-30 Thread liu ron
Hi, Feng

Thanks for driving this FLIP, this proposal is very useful for catalog
management.
I have some small questions:

1. Regarding the CatalogStoreFactory#createCatalogStore method, do we need
to provide a default implementation?
2. If we get Catalog from CatalogStore, after initializing it, whether we
put it to Map catalogs again?
3. Regarding the options `sql.catalog.store.type` and
`sql.catalog.store.file.path`, how about renaming them to
`catalog.store.type` and `catalog.store.path`?

Best,
Ron

Feng Jin  于2023年5月29日周一 21:19写道:

> Hi yuxia
>
>  > But from the code in Proposed Changes, once we register the Catalog, we
> initialize it and open it. right?
>
> Yes, In order to avoid inconsistent semantics of the original CREATE
> CATALOG DDL, Catalog will be directly initialized in registerCatalog so
> that parameter validation can be performed.
>
> In the current design, lazy initialization is mainly reflected in
> getCatalog. If CatalogStore has already saved some catalog configurations,
> only initialization is required in getCatalog.
>
>
> Best,
> Feng
>
> On Mon, May 29, 2023 at 8:27 PM yuxia  wrote:
>
> > Hi, Feng.
> > I'm trying to understanding the meaning of *lazy initialization*. If i'm
> > wrong, please correct me.
> >
> > IIUC, lazy initialization means only you need to access the catalog, then
> > you initialize it. But from the code in Proposed Changes, once we
> register
> > the Catalog,
> > we initialize it and open it. right?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Jing Ge" 
> > 收件人: "dev" 
> > 发送时间: 星期一, 2023年 5 月 29日 下午 5:12:46
> > 主题: Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration
> > and asynchronous registration
> >
> > Hi Feng,
> >
> > Thanks for your effort! +1 for the proposal.
> >
> > One of the major changes is that current design will provide
> > Map catalogs as a snapshot instead of a cache, which
> means
> > once it has been initialized, any changes done by other sessions will not
> > affect it. Point 6 described follow-up options for further improvement.
> >
> > Best regards,
> > Jing
> >
> > On Mon, May 29, 2023 at 5:31 AM Feng Jin  wrote:
> >
> > > Hi all, I would like to update you on the latest progress of the FLIP.
> > >
> > >
> > > Last week, Leonard Xu, HangRuan, Jing Ge, Shammon FY, ShengKai Fang
> and I
> > > had an offline discussion regarding the overall solution for Flink
> > > CatalogStore. We have reached a consensus and I have updated the final
> > > solution in FLIP.
> > >
> > > Next, let me briefly describe the entire design:
> > >
> > >1.
> > >
> > >Introduce CatalogDescriptor to store catalog configuration similar
> to
> > >TableDescriptor.
> > >2.
> > >
> > >The two key functions of CatalogStore - void storeCatalog(String
> > >catalogName, CatalogDescriptor) and CatalogDescriptor
> > getCatalog(String)
> > >will both use CatalogDescriptor instead of Catalog instance. This
> way,
> > >CatalogStore will only be responsible for saving and retrieving
> > catalog
> > >configurations without having to initialize catalogs.
> > >3.
> > >
> > >The default registerCatalog(String catalogName, Catalog catalog)
> > >function in CatalogManager will be marked as deprecated.
> > >4.
> > >
> > >A new function registerCatalog(String catalogName, CatalogDescriptor
> > >catalog) will be added to serve as the default registration function
> > for
> > >catalogs in CatalogManager.
> > >5.
> > >
> > >Map catalogs in CataloManager will remain unchanged
> > and
> > >save initialized catalogs.This means that deletion operations from
> one
> > >session won't synchronize with other sessions.
> > >6.
> > >
> > >To support multi-session synchronization scenarios for deletions
> later
> > >on we should make Mapcatalogs configurable.There may
> > be
> > >three possible situations:
> > >
> > >a.Default caching of all initialized catalogs
> > >
> > >b.Introduction of LRU cache logic which can automatically clear
> > >long-unused catalogs.
> > >
> > >c.No caching of any instances; each call to getCatalog creates a new
> > >instance.
> > >
> > >
> > > This is the document for discussion:
> > >
> > >
> >
> https://docs.google.com/document/d/1HRJNd4_id7i6cUxGnAybmYZIwl5g1SmZCOzGdUz-6lU/edit
> > >
> > > This is the final proposal document:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > >
> > > Thank you very much for your attention and suggestions on this FLIP.  A
> > > special thanks to Hang Ruan for his suggestions on the entire design
> and
> > > organizing offline discussions.
> > >
> > > If you have any further suggestions or feedback about this FLIP please
> > feel
> > > free to share.
> > >
> > >
> > > Best,
> > >
> > > Feng
> > >
> > > On Sat, May 6, 2023 at 8:32 PM Jing Ge 
> > wrote:
> > >
> > > 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-30 Thread yuxia
Thanks Shammon for driving it. 
The FLIP generally looks good to me. I only have one question.
WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and the 
new CatalogDatabase after modified. Is it enough only pass the origin database 
name? Will it be better to contain the origin CatalogDatabase so that listener 
have ways to know what changes?

Best regards,
Yuxia

- 原始邮件 -
发件人: "ron9 liu" 
收件人: "dev" 
发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

Hi, Shammon

Thanks for driving this FLIP, It will enforce the Flink metadata capability
from the platform produce perspective. The overall design looks good to me,
I just have some small question:
1. Regarding CatalogModificationListenerFactory#createListener method, I
think it would be better to pass Context as its parameter instead of two
specific Object. In this way, we can easily extend it in the future and
there will be no compatibility problems. Refer to
https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
2. In FLIP, you mentioned that multiple Flink tables may refer to the same
physical table, so does the Listener report this physical table repeatedly?
3. When registering a Listener object, will it connect to an external
system such as Datahub? If the Listener object registration times out due
to permission issues, it will affect the execution of all subsequent SQL,
what should we do in this case?

Best,
Ron

Shammon FY  于2023年5月31日周三 08:53写道:

> Thanks Feng, the catalog modification listener is only used to report
> read-only ddl information to other components or systems.
>
> > 1. Will an exception thrown by the listener affect the normal execution
> process?
>
> Users need to handle the exception in the listener themselves. Many DDLs
> such as drop tables and alter tables cannot be rolled back, Flink cannot
> handle these exceptions for the listener. It will cause the operation to
> exit if an exception is thrown, but the executed DDL will be successful.
>
> > 2. What is the order of execution? Is the listener executed first or are
> specific operations executed first?  If I want to perform DDL permission
> verification(such as integrating with Ranger based on the listener) , is
> that possible?
>
> The listener will be notified to report catalog modification after DDLs are
> successful, so you can not do permission verification for DDL in the
> listener. As mentioned above, Flink will not roll back the DDL even when
> the listener throws an exception. I think permission verification is
> another issue and can be discussed separately.
>
>
> Best,
> Shammon FY
>
> On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
>
> > Hi, Shammon
> >
> > Thanks for driving this Flip, [Support Customized Job Meta Data Listener]
> > will  make it easier for Flink to collect lineage information.
> > I fully agree with the overall solution and have a small question:
> >
> > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > 2. What is the order of execution? Is the listener executed first or are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> >
> > Best,
> > Feng
> >
> > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> >
> > > Hi devs,
> > >
> > > We would like to bring up a discussion about FLIP-294: Support
> Customized
> > > Job Meta Data Listener[1]. We have had several discussions with Jark
> Wu,
> > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> and
> > > interfaces, and thanks for their valuable advice.
> > > The overall job and connector information is divided into metadata and
> > > lineage, this FLIP focuses on metadata and lineage will be discussed in
> > > another FLIP in the future. In this FLIP we want to add a customized
> > > listener in Flink to report catalog modifications to external metadata
> > > systems such as datahub[2] or atlas[3]. Users can view the specific
> > > information of connectors such as source and sink for Flink jobs in
> these
> > > systems, including fields, watermarks, partitions, etc.
> > >
> > > Looking forward to hearing from you, thanks.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > [2] https://datahub.io/
> > > [3] https://atlas.apache.org/#/
> > >
> >
>


Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-05-30 Thread liu ron
Hi, Jinsong

Thanks for your valuable suggestions.

Best,
Ron

Jingsong Li  于2023年5月30日周二 13:22写道:

> Thanks Ron for your information.
>
> I suggest that it can be written in the Motivation of FLIP.
>
> Best,
> Jingsong
>
> On Tue, May 30, 2023 at 9:57 AM liu ron  wrote:
> >
> > Hi, Jingsong
> >
> > Thanks for your review. We have tested it in TPC-DS case, and got a 12%
> > gain overall when only supporting only Calc&HashJoin&HashAgg operator. In
> > some queries, we even get more than 30% gain, it looks like  an effective
> > way.
> >
> > Best,
> > Ron
> >
> > Jingsong Li  于2023年5月29日周一 14:33写道:
> >
> > > Thanks Ron for the proposal.
> > >
> > > Do you have some benchmark results for the performance improvement? I
> > > am more concerned about the improvement on Flink than the data in
> > > other papers.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, May 29, 2023 at 2:16 PM liu ron  wrote:
> > > >
> > > > Hi, dev
> > > >
> > > > I'd like to start a discussion about FLIP-315: Support Operator
> Fusion
> > > > Codegen for Flink SQL[1]
> > > >
> > > > As main memory grows, query performance is more and more determined
> by
> > > the
> > > > raw CPU costs of query processing itself, this is due to the query
> > > > processing techniques based on interpreted execution shows poor
> > > performance
> > > > on modern CPUs due to lack of locality and frequent instruction
> > > > mis-prediction. Therefore, the industry is also researching how to
> > > improve
> > > > engine performance by increasing operator execution efficiency. In
> > > > addition, during the process of optimizing Flink's performance for
> TPC-DS
> > > > queries, we found that a significant amount of CPU time was spent on
> > > > virtual function calls, framework collector calls, and invalid
> > > > calculations, which can be optimized to improve the overall engine
> > > > performance. After some investigation, we found Operator Fusion
> Codegen
> > > > which is proposed by Thomas Neumann in the paper[2] can address these
> > > > problems. I have finished a PoC[3] to verify its feasibility and
> > > validity.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > > [2]: http://www.vldb.org/pvldb/vol4/p539-neumann.pdf
> > > > [3]: https://github.com/lsyldliu/flink/tree/OFCG
> > > >
> > > > Best,
> > > > Ron
> > >
>


[jira] [Created] (FLINK-32220) Improving the adaptive local hash agg code to avoid get value from RowData repeatedly

2023-05-30 Thread dalongliu (Jira)
dalongliu created FLINK-32220:
-

 Summary: Improving the adaptive local hash agg code to avoid get 
value from RowData repeatedly
 Key: FLINK-32220
 URL: https://issues.apache.org/jira/browse/FLINK-32220
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: dalongliu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-30 Thread Shammon FY
Hi ron,

Thanks for your feedback.

> 1. Regarding CatalogModificationListenerFactory#createListener method, I
think it would be better to pass Context as its parameter instead of two
specific Object. In this way, we can easily extend it in the future and
there will be no compatibility problems.

It sounds good to me I will add Context in
CatalogModificationListenerFactory, thanks

> 2. In FLIP, you mentioned that multiple Flink tables may refer to the
same physical table, so does the Listener report this physical table
repeatedly?

Yes, the listeners for different jobs may receive the same physical table,
users should check and update the table information based on the
identifier. For example, users may create tables on the same kafka topic in
different jobs, which will notify listeners for the same kafka topic.

> 3. When registering a Listener object, will it connect to an external
system such as Datahub? If the Listener object registration times out due
to permission issues, it will affect the execution of all subsequent SQL,
what should we do in this case?

Users should establish connections to external systems when creating a
listener as needed, and they should handle the exceptions too. If users
fail to create a listener and throw an exception, Flink will throw the
exception too.

Best,
Shammon FY

On Wed, May 31, 2023 at 11:36 AM liu ron  wrote:

> Hi, Shammon
>
> Thanks for driving this FLIP, It will enforce the Flink metadata capability
> from the platform produce perspective. The overall design looks good to me,
> I just have some small question:
> 1. Regarding CatalogModificationListenerFactory#createListener method, I
> think it would be better to pass Context as its parameter instead of two
> specific Object. In this way, we can easily extend it in the future and
> there will be no compatibility problems. Refer to
>
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> 2. In FLIP, you mentioned that multiple Flink tables may refer to the same
> physical table, so does the Listener report this physical table repeatedly?
> 3. When registering a Listener object, will it connect to an external
> system such as Datahub? If the Listener object registration times out due
> to permission issues, it will affect the execution of all subsequent SQL,
> what should we do in this case?
>
> Best,
> Ron
>
> Shammon FY  于2023年5月31日周三 08:53写道:
>
> > Thanks Feng, the catalog modification listener is only used to report
> > read-only ddl information to other components or systems.
> >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > Users need to handle the exception in the listener themselves. Many DDLs
> > such as drop tables and alter tables cannot be rolled back, Flink cannot
> > handle these exceptions for the listener. It will cause the operation to
> > exit if an exception is thrown, but the executed DDL will be successful.
> >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> > The listener will be notified to report catalog modification after DDLs
> are
> > successful, so you can not do permission verification for DDL in the
> > listener. As mentioned above, Flink will not roll back the DDL even when
> > the listener throws an exception. I think permission verification is
> > another issue and can be discussed separately.
> >
> >
> > Best,
> > Shammon FY
> >
> > On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
> >
> > > Hi, Shammon
> > >
> > > Thanks for driving this Flip, [Support Customized Job Meta Data
> Listener]
> > > will  make it easier for Flink to collect lineage information.
> > > I fully agree with the overall solution and have a small question:
> > >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > > process?
> > >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> > >
> > > > Hi devs,
> > > >
> > > > We would like to bring up a discussion about FLIP-294: Support
> > Customized
> > > > Job Meta Data Listener[1]. We have had several discussions with Jark
> > Wu,
> > > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> > and
> > > > interfaces, and thanks for their valuable advice.
> > > > The overall job and connector information is divided into metadata
> and
> > > > lineage, this FLIP focuses on metadata and lineage will be discussed
> in
> 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-30 Thread Aitozi
Hi Jing,
What do you think about it? Can we move forward this feature?

Thanks,
Aitozi.

Aitozi  于2023年5月29日周一 09:56写道:

> Hi Jing,
> > "Do you mean to support the AyncTableFunction beyond the
> LookupTableSource?"
> Yes, I mean to support the AyncTableFunction beyond the LookupTableSource.
>
> The "AsyncTableFunction" is the function with ability to be executed async
> (with AsyncWaitOperator).
> The async lookup join is a one of usage of this. So, we don't have to bind
> the AyncTableFunction with LookupTableSource.
> If User-defined AsyncTableFunction is supported, user can directly use
> lateral table syntax to perform async operation.
>
> > "It would be better if you could elaborate the proposed changes wrt the
> CorrelatedCodeGenerator with more details"
>
> In the proposal, we use lateral table syntax to support the async table
> function. So the planner will also treat this statement to a
> CommonExecCorrelate node. So the runtime code should be generated in
> CorrelatedCodeGenerator.
> In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
> `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator to
> execute the async table function.
>
>
> Thanks,
> Aitozi.
>
>
> Jing Ge  于2023年5月29日周一 03:22写道:
>
>> Hi Aitozi,
>>
>> Thanks for the clarification. The naming "Lookup" might suggest using it
>> for table look up. But conceptually what the eval() method will do is to
>> get a collection of results(Row, RowData) from the given keys. How it will
>> be done depends on the implementation, i.e. you can implement your own
>> Source[1][2]. The example in the FLIP should be able to be handled in this
>> way.
>>
>> Do you mean to support the AyncTableFunction beyond the LookupTableSource?
>> It would be better if you could elaborate the proposed changes wrt the
>> CorrelatedCodeGenerator with more details. Thanks!
>>
>> Best regards,
>> Jing
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
>> [2]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
>>
>> On Sat, May 27, 2023 at 9:48 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > Thanks for your response. As stated in the FLIP, the purpose of this
>> > FLIP is meant to support
>> > user-defined async table function. As described in flink document [1]
>> >
>> > Async table functions are special functions for table sources that
>> perform
>> > > a lookup.
>> > >
>> >
>> > So end user can not directly define and use async table function now. An
>> > user case is reported in [2]
>> >
>> > So, in conclusion, no new interface is introduced, but we extend the
>> > ability to support user-defined async table function.
>> >
>> > [1]:
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
>> > [2]: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> >
>> > Thanks.
>> > Aitozi.
>> >
>> >
>> > Jing Ge  于2023年5月27日周六 06:40写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Thanks for your proposal. I am not quite sure if I understood your
>> > thoughts
>> > > correctly. You described a special case implementation of the
>> > > AsyncTableFunction with on public API changes. Would you please
>> elaborate
>> > > your purpose of writing a FLIP according to the FLIP documentation[1]?
>> > > Thanks!
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > > On Wed, May 24, 2023 at 1:07 PM Aitozi  wrote:
>> > >
>> > > > May I ask for some feedback  :D
>> > > >
>> > > > Thanks,
>> > > > Aitozi
>> > > >
>> > > > Aitozi  于2023年5月23日周二 19:14写道:
>> > > > >
>> > > > > Just catch an user case report from Giannis Polyzos for this
>> usage:
>> > > > >
>> > > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> > > > >
>> > > > > Aitozi  于2023年5月23日周二 17:45写道:
>> > > > > >
>> > > > > > Hi guys,
>> > > > > > I want to bring up a discussion about adding support of User
>> > > > > > Defined AsyncTableFunction in Flink.
>> > > > > > Currently, async table function are special functions for table
>> > > source
>> > > > > > to perform
>> > > > > > async lookup. However, it's worth to support the user defined
>> async
>> > > > > > table function.
>> > > > > > Because, in this way, the end SQL user can leverage it to
>> perform
>> > the
>> > > > > > async operation
>> > > > > > which is useful to maximum the system throughput especially for
>> IO
>> > > > > > bottleneck case.
>> > > > > >
>> > > > > > You can find some more detail in [1].
>> > > > > >
>> > > > > > Looking forward to feedback
>> > > > > >
>> > > > > >
>> > > > >

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-30 Thread Shammon FY
Hi yuxia

Thanks for your input. The `AlterDatabaseEvent` extends
`DatabaseModificationEvent` which has the original database.

Best,
Shammon FY

On Wed, May 31, 2023 at 2:24 PM yuxia  wrote:

> Thanks Shammon for driving it.
> The FLIP generally looks good to me. I only have one question.
> WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and
> the new CatalogDatabase after modified. Is it enough only pass the origin
> database name? Will it be better to contain the origin CatalogDatabase so
> that listener have ways to know what changes?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>
> Hi, Shammon
>
> Thanks for driving this FLIP, It will enforce the Flink metadata capability
> from the platform produce perspective. The overall design looks good to me,
> I just have some small question:
> 1. Regarding CatalogModificationListenerFactory#createListener method, I
> think it would be better to pass Context as its parameter instead of two
> specific Object. In this way, we can easily extend it in the future and
> there will be no compatibility problems. Refer to
>
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> 2. In FLIP, you mentioned that multiple Flink tables may refer to the same
> physical table, so does the Listener report this physical table repeatedly?
> 3. When registering a Listener object, will it connect to an external
> system such as Datahub? If the Listener object registration times out due
> to permission issues, it will affect the execution of all subsequent SQL,
> what should we do in this case?
>
> Best,
> Ron
>
> Shammon FY  于2023年5月31日周三 08:53写道:
>
> > Thanks Feng, the catalog modification listener is only used to report
> > read-only ddl information to other components or systems.
> >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > Users need to handle the exception in the listener themselves. Many DDLs
> > such as drop tables and alter tables cannot be rolled back, Flink cannot
> > handle these exceptions for the listener. It will cause the operation to
> > exit if an exception is thrown, but the executed DDL will be successful.
> >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> > The listener will be notified to report catalog modification after DDLs
> are
> > successful, so you can not do permission verification for DDL in the
> > listener. As mentioned above, Flink will not roll back the DDL even when
> > the listener throws an exception. I think permission verification is
> > another issue and can be discussed separately.
> >
> >
> > Best,
> > Shammon FY
> >
> > On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
> >
> > > Hi, Shammon
> > >
> > > Thanks for driving this Flip, [Support Customized Job Meta Data
> Listener]
> > > will  make it easier for Flink to collect lineage information.
> > > I fully agree with the overall solution and have a small question:
> > >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > > process?
> > >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> > >
> > > > Hi devs,
> > > >
> > > > We would like to bring up a discussion about FLIP-294: Support
> > Customized
> > > > Job Meta Data Listener[1]. We have had several discussions with Jark
> > Wu,
> > > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> > and
> > > > interfaces, and thanks for their valuable advice.
> > > > The overall job and connector information is divided into metadata
> and
> > > > lineage, this FLIP focuses on metadata and lineage will be discussed
> in
> > > > another FLIP in the future. In this FLIP we want to add a customized
> > > > listener in Flink to report catalog modifications to external
> metadata
> > > > systems such as datahub[2] or atlas[3]. Users can view the specific
> > > > information of connectors such as source and sink for Flink jobs in
> > these
> > > > systems, including fields, watermarks, partitions, etc.
> > > >
> > > > Looking forward to hearing from you, thanks.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > [2] htt

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Dong Lin
Hi Piotr,

Thanks for the reply. Please see my comments inline.

On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> First of all we don't need to send any extra signal from source (or non
> source) operators. All of the operators are already reporting backpressured
> metrics [1]
> and all of the metrics are already sent to JobManager. We would only need
>

Hmm... I am not sure metrics such as isBackPressured are already sent to
JM. According to the doc
,
this metric is only available on TaskManager. And I could't find the code
that sends these metrics to JM. Can you help provide link to the code and
doc that shows this metric is reported to JM.

Suppose this metric is indeed reported to JM, we also need to confirm that
the frequency meets our need. For example, typically metrics are updated on
the order of seconds. The default metric reporter interval (as specified in
MetricOptions) is 10 seconds, which is probably not sufficient for the
suggested approach to work reliably. This is because the longer the
interval, the more likely that the algorithm will not trigger checkpoint
using the short interval even if all subtasks are not-backpressured.

For example, let's say every source operator subtask reports this metric to
JM once every 10 seconds. There are 100 source subtasks. And each subtask
is backpressured roughly 10% of the total time due to traffic spikes (and
limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
chance that there is at least one subtask that is backpressured. Then we
have to wait for at least 10 seconds to check again. The expected
checkpointing interval can be very close to 30 minutes in the use-case
mentioned earlier.

to pass some accessor to the metrics to the `CheckpointTrigger`.


> > execution.checkpointing.interval.no-backpressure
>
> Maybe that's the way to go, but as I mentioned before, I could see this
> `CheckpointTrigger` to be a pluggable component, that could have been
> configured
> the same way as `MetricReporters` are right now [2]. We could just provide
> out of the box two plugins, one implementing current checkpoint triggering
> strategy,
> and the other using backpressure.
>

Yes, it is possible to add a CheckpointTrigger as a pluggable component. I
am open to this idea as long as it provides benefits over the job-level
config (e.g. covers more use-case, or simpler configuration for
common-case).

I think we can decide how to let user specify this interval after we are
able to address the other issues related to the feasibility and reliability
of the suggested approach.


> > I think the root cause of this issue is that the decision of the
> > checkpointing interval really depends on the expected impact of a
> > checkpoint on the throughput.
>
> Yes, I agree. Ideally we probably should adjust the checkpointing interval
> based on measured latency, for example using latency markers [3], but that
> would
> require some investigation if latency markers are indeed that costly as
> documented and if so optimizing them to solve the performance degradation
> of enabling
> e2e latency tracking.


> However, given that the new back pressure monitoring strategy would be
> optional AND users could implement their own `CheckpointTrigger` if really
> needed
> AND I have a feeling that there might be an even better solution (more
> about that later).
>

Overall I guess you are suggesting that 1) we can optimize the overhead of
latency tracking so that we can always turn it on and 2) we can use the
measured latency to dynamically determine checkpointing interval.

I can understand this intuition. Still, the devil is in the details. After
thinking more about this, I am not sure I can find a good way to make it
work. I am happy to discuss proc/cons if you provide more concrete
solutions.

Note that goals of the alternative approach include 1) support sources
other than HybridSource and 2) reduce checkpointing interval when the job
is backpressured. These goals are not necessary to achieve the use-case
targed by FLIP-309. While it will be nice to support additional use-cases
with one proposal, it is probably also reasonable to make incremental
progress and support the low-hanging-fruit use-case first. The choice
really depends on the complexity and the importance of supporting the extra
use-cases.

I am hoping that we can still be open to using the approach proposed in
FLIP-309 and we can not make the alternative approach work. What do you
think?


> > if the checkpointing overhead is
> > close to none, then it is beneficial to the e2e latency to still
> checkpoint
> > a high frequency even if there exists (intermittent) backpressure.
>
> In that case users could just configure a slow checkpointing interval to a
> lower value, or just use static checkpoint interval strategy.
>

I guess the point is that the suggested approach, which dynamically
determines the checkpoi