Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-09 Thread Benchao Li
Thanks Leonard for the input, "Implicitly type conversion" way sounds good
to me.
I also agree that this should be done in planner instead of connector,
it'll be a lot easier for connector development.

Leonard Xu  于2023年6月9日周五 20:11写道:

> About the semantics consideration, I have some new input after rethink.
>
> 1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following
> the syntax  `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME AS
> OF `
>
> 2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is
> pretty intuitive, for TIMESTAMP_type, it will be implied cast to
> TIMESTAMP_LTZ type by planner using session timezone and then pass to
> CatalogTable. This case can be considered as a Function AsOfSnapshot(Table
> t, TIMESTAMP_LTZ arg), which can pass arg with TIMESTAMP_LTZ type, but our
> framework supports implicit type conversion thus users can also pass arg
> with TIMESTAMP type. Hint, Spark[1] did the  implicit type conversion too.
>
> 3.I also considered handing over the implicit type conversion to the
> connector instead of planner, such as passing a TIMESTAMP literal, and the
> connector using the session timezone to perform type conversion, but this
> is more complicated than previous planner handling, and it’s not friendly
> to the connector developers.
>
> 4. The last point,  TIMESTAMP_LTZ  '1970-01-01 00:00:04.001’ should be an
> invalid expression as if you can not define a instant point (i.e
> TIMSTAMP_LTZ semantics in SQL) from a timestamp literal without timezone.
> You can use explicit type conversion like `cast(ts_ntz as TIMESTAMP_LTZ)`
> after `FOR SYSTEM_TIME AS OF ` if you want to use
> Timestamp type/expression/literal without timezone.
>
> 5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST
> time[2] well that will help user avoid many corner case.
>
>
> Best,
> Leonard
>
> [1]
> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support
>
>
>
>
> > On Jun 9, 2023, at 1:13 PM, Benchao Li  wrote:
> >
> > As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's
> > where the time zone happens.
> >
> > What I'm talking about is casting timestamp/timestamp_ltz to long
> directly,
> > that's why the semantic is tricky when you are casting timestamp to long
> > using time zone.
> >
> > For other systems, such as SQL server[1], they actually uses a string
> > instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
> > 00:00:00.000'`, I'm not sure whether they convert the string
> implicitly
> > to TIMESTAMP_LTZ, or they just have a different definition of the syntax.
> >
> > But for us, we are definitely using timestamp/timestmap_ltz literal here,
> > that's why it is special, and we must highlight this behavior that we are
> > converting a timestamp without time zone literal to long using the
> session
> > time zone.
> >
> > [1]
> >
> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16
> >
> > Feng Jin  于2023年6月8日周四 11:35写道:
> >
> >> Hi all,
> >>
> >> thanks for your input
> >>
> >>
> >> @Benchao
> >>
> >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> >> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC
> timezone,
> >> which is not usually expected by users.
> >>
> >> It was indeed the case before Flink 1.13, but now my understanding is
> that
> >> there have been some slight changes in the definition of TIMESTAMP.
> >>
> >> TIMESTAMP is currently used to specify the year, month, day, hour,
> minute
> >> and second. We recommend that users use
> *UNIX_TIMESTAMP(CAST(timestamp_col
> >> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
> >> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
> >> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will
> involve
> >> using the *LOCAL TIME ZONE*.
> >>
> >>
> >> Here is an test:
> >>
> >> Flink SQL> SET 'table.local-time-zone' = 'UTC';
> >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> >> STRING)) as `timestamp`;
> >> ---
> >> timestamp
> >> --
> >> 0
> >>
> >> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
> >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> >> STRING)) as `timestamp`;
> >> ---
> >> timestamp
> >> --
> >> -28800
> >>
> >> Therefore, the current conversion method exposed to users is also using
> >> LOCAL TIME ZONE.
> >>
> >>
> >> @yuxia
> >>
> >> Thank you very much for providing the list of behaviors of TIMESTAMP in
> >> other systems.
> >>
> >>> I think we can align them to avoid the inconsistency to other engines
> and
> >> provide convenience for the 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Aitozi
Hi Jing,
I means the join key is not necessary to be the primary key or unique
index of the database.
In this situation, we may queried out multi rows for one join key. I think
that's why the
LookupFunction#lookup will return a collection of RowData.

BTW, I think the behavior of lookup join will not affect the semantic of
the async udtf.
We use the Async TableFunction here and the table function can collect
multiple rows.

Thanks,
Atiozi.



Jing Ge  于2023年6月10日周六 00:15写道:

> Hi Aitozi,
>
> The keyRow used in this case contains all keys[1].
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>
>
> On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
>
> > Hi Jing,
> >
> >  The performance test is added to the FLIP.
> >
> >  As I know, The lookup join can return multi rows, it depends on
> > whether  the join key
> > is the primary key of the external database or not. The `lookup` [1] will
> > return a collection of
> > joined result, and each of them will be collected
> >
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> >
> >
> > Thanks,
> > Aitozi.
> >
> > Jing Ge  于2023年6月9日周五 17:05写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the feedback. Looking forward to the performance tests.
> > >
> > > Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> > > lookup function is used to enrich column(s) from the dimension table.
> If,
> > > for the given key, there will be more than one row, there will be no
> way
> > to
> > > know which row will be used to enrich the key.
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
> > >
> > > > Hi Jing
> > > > Thanks for your good questions. I have updated the example to the
> > > FLIP.
> > > >
> > > > > Only one row for each lookup
> > > > lookup can also return multi rows, based on the query result. [1]
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > > >
> > > > > If we use async calls with lateral join, my gut feeling is
> > > > that we might have many more async calls than lookup join. I am not
> > > really
> > > > sure if we will be facing potential issues in this case or not.
> > > >
> > > > IMO, the work pattern is similar to the lookup function, for each row
> > > from
> > > > the left table,
> > > > it will evaluate the eval method once, so the async call numbers will
> > not
> > > > change.
> > > > and the maximum calls in flight is limited by the Async operators
> > buffer
> > > > capacity
> > > > which will be controlled by the option.
> > > >
> > > > BTW, for the naming of these option, I updated the FLIP about this
> you
> > > can
> > > > refer to
> > > > the section of "ConfigOption" and "Rejected Alternatives"
> > > >
> > > > In the end, for the performance evaluation, I'd like to do some tests
> > and
> > > > will update it to the FLIP doc
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > > Jing Ge  于2023年6月9日周五 07:23写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for the clarification. The code example looks interesting. I
> > > would
> > > > > suggest adding them into the FLIP. The description with code
> examples
> > > > will
> > > > > help readers understand the motivation and how to use it. Afaiac,
> it
> > > is a
> > > > > valid feature for Flink users.
> > > > >
> > > > > As we knew, lookup join is based on temporal join, i.e. FOR
> > SYSTEM_TIME
> > > > AS
> > > > > OF which is also used in your code example. Temporal join performs
> > the
> > > > > lookup based on the processing time match. Only one row for each
> > > > > lookup(afaiu, I need to check the source code to double confirm)
> will
> > > > > return for further enrichment. One the other hand, lateral join
> will
> > > have
> > > > > sub-queries correlated with every individual value of the reference
> > > table
> > > > > from the preceding part of the query and each sub query will return
> > > > > multiple rows. If we use async calls with lateral join, my gut
> > feeling
> > > is
> > > > > that we might have many more async calls than lookup join. I am not
> > > > really
> > > > 

Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-09 Thread Tzu-Li (Gordon) Tai
> Regarding (2), definitely. This is something we planned to add later on
but
so far keeping things common has been working well.

My main worry for doing this as a later iteration is that this would
probably be a breaking change for the public interface. If that can be
avoided and planned ahead, I'm fine with moving forward with how it is
right now.

> DynamicKafkaSource may be confusing because it is really similar to the
KafkaDynamicSource/Sink (table connectors).

The table / sql Kafka connectors (KafkaDynamicTableFactory,
KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes
not really meant to be exposed to the user though.
It can cause some confusion internally for the code maintainers, but on the
actual public surface I don't see this being an issue.

Thanks,
Gordon

On Wed, Jun 7, 2023 at 8:55 PM Mason Chen  wrote:

> Hi Gordon,
>
> Thanks for taking a look!
>
> Regarding (1), there is a need from the readers to send this event at
> startup because the reader state may reflect outdated metadata. Thus, the
> reader should not start without fresh metadata. With fresh metadata, the
> reader can filter splits from state--this filtering capability is
> ultimately how we solve the common issue of "I re-configured my Kafka
> source and removed some topic, but it refers to the old topic due to state
> *[1]*". I did not mention this because I thought this is more of a detail
> but I'll make a brief note of it.
>
> Regarding (2), definitely. This is something we planned to add later on but
> so far keeping things common has been working well. In that regard, yes the
> metadata service should expose these configurations but the source should
> not check it into state unlike the other metadata. I'm going to add it to a
> section called "future enhancements". This is also feedback that Ryan, an
> interested user, gave earlier in this thread.
>
> Regarding (3), that's definitely a good point and there are some real use
> cases, in addition to what you mentioned, to use this in single cluster
> mode (see *[1] *above). DynamicKafkaSource may be confusing because it is
> really similar to the KafkaDynamicSource/Sink (table connectors).
>
> Best,
> Mason
>
> On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi Mason,
> >
> > Thanks for updating the FLIP. In principle, I believe this would be a
> > useful addition. Some comments so far:
> >
> > 1. In this sequence diagram [1], why is there a need for a
> > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the
> > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start
> sending
> > metadata update events to the reader once it is registered at the
> > enumerator?
> >
> > 2. Looking at the new builder API, there's a few configurations that are
> > common across *all *discovered Kafka clusters / topics, specifically the
> > deserialization schema, offset initialization strategy, Kafka client
> > properties, and consumer group ID. Is there any use case that users would
> > want to have these configurations differ across different Kafka clusters?
> > If that's the case, would it make more sense to encapsulate these
> > configurations to be owned by the metadata service?
> >
> > 3. Is MultiClusterKafkaSource the best name for this connector? I find
> that
> > the dynamic aspect of Kafka connectivity to be a more defining
> > characteristic, and that is the main advantage it has compared to the
> > static KafkaSource. A user may want to use this new connector over
> > KafkaSource even if they're just consuming from a single Kafka cluster;
> for
> > example, one immediate use case I can think of is Kafka repartitioning
> with
> > zero Flink job downtime. They create a new topic with higher parallelism
> > and repartition their Kafka records from the old topic to the new topic,
> > and they want the consuming Flink job to be able to move from the old
> topic
> > to the new topic with zero-downtime while retaining exactly-once
> > guarantees. So, perhaps DynamicKafkaSource is a better name for this
> > connector?
> >
> > Thanks,
> > Gordon
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png
> >
> > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen 
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the prompt feedback! I had some confusion with how to resize
> > > images in confluence--anyways, I have made the font bigger, added white
> > > background, and also made the diagrams themselves bigger.
> > >
> > > Regarding the exactly once semantics, that's definitely good to point
> out
> > > in the doc. Thus, I have broken out my "Basic Idea" section into:
> > > 1. an intro
> > > 2. details about KafkaMetadataService
> > > 3. details about KafkaStream and KafkaClusterId (the metadata)
> > > 4. details about exactly once semantics and consistency guarantees
> > >
> > > This should give readers enough context 

[jira] [Created] (FLINK-32305) History Server is slow at starting

2023-06-09 Thread Yiming Zang (Jira)
Yiming Zang created FLINK-32305:
---

 Summary: History Server is slow at starting
 Key: FLINK-32305
 URL: https://issues.apache.org/jira/browse/FLINK-32305
 Project: Flink
  Issue Type: Bug
Reporter: Yiming Zang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-09 Thread Jing Ge
Hi Samrat,

The FLIP looks good, thanks!

Best regards,
Jing


On Tue, Jun 6, 2023 at 8:16 PM Samrat Deb  wrote:

> Hi Jing,
>
> >  I would suggest adding that information into the
> FLIP.
>
> Updated now, please review the new version of flip whenever time.
>
> > +1 Looking forward to your PR :-)
> I will request for your review once m ready with PR :-)
>
> Bests,
> Samrat
>
> On Tue, Jun 6, 2023 at 11:43 PM Samrat Deb  wrote:
>
> > Hi Martijn,
> >
> > > If I understand this correctly, the Redshift sink
> > would not be able to support exactly-once, is that correct?
> >
> > As I delve deeper into the study of Redshift's capabilities, I have
> > discovered that it does support "merge into" operations [1] and some
> > merge into examples [2].
> > This opens up the possibility of implementing exactly-once semantics with
> > the connector.
> > However, I believe it would be prudent to start with a more focused scope
> > for the initial phase of implementation and defer the exact-once support
> > for subsequent iterations.
> >
> > Before finalizing the approach, I would greatly appreciate your thoughts
> > and suggestions on this matter.
> > Should we prioritize the initial implementation without exactly-once
> > support, or would you advise incorporating it right from the start?
> > Your insights and experiences would be immensely valuable in making this
> > decision.
> >
> >
> > [1]
> >
> https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html
> > [2] https://docs.aws.amazon.com/redshift/latest/dg/merge-examples.html
> >
> > Bests,
> > Samrat
> >
> > On Mon, Jun 5, 2023 at 7:09 PM Jing Ge 
> wrote:
> >
> >> Hi Samrat,
> >>
> >> Thanks for the feedback. I would suggest adding that information into
> the
> >> FLIP.
> >>
> >> +1 Looking forward to your PR :-)
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Sat, Jun 3, 2023 at 9:19 PM Samrat Deb 
> wrote:
> >>
> >> > Hi Jing Ge,
> >> >
> >> > >>> Do you already have any prototype? I'd like to join the reviews.
> >> > The prototype is in progress. I will raise the dedicated PR for review
> >> soon
> >> > also notify in this thread as well .
> >> >
> >> > >>> Will the Redshift connector provide additional features
> >> > beyond the mediator/wrapper of the jdbc connector?
> >> >
> >> > Here are the additional features that the Flink connector for AWS
> >> Redshift
> >> > can provide on top of using JDBC:
> >> >
> >> > 1. Integration with AWS Redshift Workload Management (WLM): AWS
> Redshift
> >> > allows you to configure WLM[1] to manage query prioritization and
> >> resource
> >> > allocation. The Flink connector for Redshift will be agnostic to the
> >> > configured WLM and utilize it for scaling in and out for the sink.
> This
> >> > means that the connector can leverage the WLM capabilities of Redshift
> >> to
> >> > optimize the execution of queries and allocate resources efficiently
> >> based
> >> > on your defined workload priorities.
> >> >
> >> > 2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes
> >> > certain quotas and limits[2] on various aspects such as the number of
> >> > clusters, concurrent connections, queries per second, etc. The Flink
> >> > connector for Redshift will provide an abstraction layer for users,
> >> > allowing them to work with Redshift without having to worry about
> these
> >> > specific limits. The connector will handle the management of
> connections
> >> > and queries within the defined quotas and limits, abstracting away the
> >> > complexity and ensuring compliance with Redshift's restrictions.
> >> >
> >> > These features aim to simplify the integration of Flink with AWS
> >> Redshift,
> >> > providing optimized resource utilization and transparent handling of
> >> > Redshift-specific limitations.
> >> >
> >> > Bests,
> >> > Samrat
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://docs.aws.amazon.com/redshift/latest/dg/cm-c-implementing-workload-management.html
> >> > [2]
> >> >
> >> >
> >>
> https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html
> >> >
> >> > On Sat, Jun 3, 2023 at 11:40 PM Samrat Deb 
> >> wrote:
> >> >
> >> > > Hi Ahmed,
> >> > >
> >> > > >>> please let me know If you need any collaboration regarding
> >> > integration
> >> > > with
> >> > > AWS connectors credential providers or regarding FLIP-171 I would be
> >> more
> >> > > than happy to assist.
> >> > >
> >> > > Sure, I will reach out incase of any hands required.
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Jun 2, 2023 at 6:12 PM Jing Ge 
> >> > wrote:
> >> > >
> >> > >> Hi Samrat,
> >> > >>
> >> > >> Excited to see your proposal. Supporting data warehouses is one of
> >> the
> >> > >> major tracks for Flink. Thanks for driving it! Happy to see that we
> >> > >> reached
> >> > >> consensus to prioritize the Sink over Source in the previous
> >> discussion.
> >> > >> Do
> >> > >> you already have any prototype? I'd like to join the reviews.
> >> > >>
> >> > >> Just out 

[jira] [Created] (FLINK-32304) Reduce rpc-akka jar

2023-06-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32304:


 Summary: Reduce rpc-akka jar
 Key: FLINK-32304
 URL: https://issues.apache.org/jira/browse/FLINK-32304
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Runtime / RPC
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0, 1.17.2


We bundle unnecessary dependencies in the rpc-akka jar; we can easily shave of 
15mb of dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32303) Incorrect error message in KafkaSource

2023-06-09 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-32303:
--

 Summary: Incorrect error message in KafkaSource 
 Key: FLINK-32303
 URL: https://issues.apache.org/jira/browse/FLINK-32303
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1, 1.18.0
Reporter: Piotr Nowojski


When exception is thrown from an operator chained with a KafkaSource, 
KafkaSource is returning a misleading error, like shown below:

{noformat}
java.io.IOException: Failed to deserialize consumer record due to
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
 ~[classes/:?]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[classes/:?]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:852)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:801) 
~[classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[classes/:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[classes/:?]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[classes/:?]
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[classes/:?]
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[classes/:?]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[classes/:?]
... 14 more
Caused by: org.apache.flink.runtime.operators.testutils.ExpectedTestException: 
Failover!
at 
org.apache.flink.test.ManySmallJobsBenchmarkITCase$ThrottlingAndFailingIdentityMap.map(ManySmallJobsBenchmarkITCase.java:263)
 ~[test-classes/:?]
at 
org.apache.flink.test.ManySmallJobsBenchmarkITCase$ThrottlingAndFailingIdentityMap.map(ManySmallJobsBenchmarkITCase.java:243)
 ~[test-classes/:?]
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[classes/:?]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[classes/:?]
   

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Jing Ge
Hi Aitozi,

The keyRow used in this case contains all keys[1].

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49


On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:

> Hi Jing,
>
>  The performance test is added to the FLIP.
>
>  As I know, The lookup join can return multi rows, it depends on
> whether  the join key
> is the primary key of the external database or not. The `lookup` [1] will
> return a collection of
> joined result, and each of them will be collected
>
>
> [1]:
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
>
>
> Thanks,
> Aitozi.
>
> Jing Ge  于2023年6月9日周五 17:05写道:
>
> > Hi Aitozi,
> >
> > Thanks for the feedback. Looking forward to the performance tests.
> >
> > Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> > lookup function is used to enrich column(s) from the dimension table. If,
> > for the given key, there will be more than one row, there will be no way
> to
> > know which row will be used to enrich the key.
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > [2]
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> >
> > Best regards,
> > Jing
> >
> > On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
> >
> > > Hi Jing
> > > Thanks for your good questions. I have updated the example to the
> > FLIP.
> > >
> > > > Only one row for each lookup
> > > lookup can also return multi rows, based on the query result. [1]
> > >
> > > [1]:
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > >
> > > > If we use async calls with lateral join, my gut feeling is
> > > that we might have many more async calls than lookup join. I am not
> > really
> > > sure if we will be facing potential issues in this case or not.
> > >
> > > IMO, the work pattern is similar to the lookup function, for each row
> > from
> > > the left table,
> > > it will evaluate the eval method once, so the async call numbers will
> not
> > > change.
> > > and the maximum calls in flight is limited by the Async operators
> buffer
> > > capacity
> > > which will be controlled by the option.
> > >
> > > BTW, for the naming of these option, I updated the FLIP about this you
> > can
> > > refer to
> > > the section of "ConfigOption" and "Rejected Alternatives"
> > >
> > > In the end, for the performance evaluation, I'd like to do some tests
> and
> > > will update it to the FLIP doc
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > >
> > > Jing Ge  于2023年6月9日周五 07:23写道:
> > >
> > > > Hi Aitozi,
> > > >
> > > > Thanks for the clarification. The code example looks interesting. I
> > would
> > > > suggest adding them into the FLIP. The description with code examples
> > > will
> > > > help readers understand the motivation and how to use it. Afaiac, it
> > is a
> > > > valid feature for Flink users.
> > > >
> > > > As we knew, lookup join is based on temporal join, i.e. FOR
> SYSTEM_TIME
> > > AS
> > > > OF which is also used in your code example. Temporal join performs
> the
> > > > lookup based on the processing time match. Only one row for each
> > > > lookup(afaiu, I need to check the source code to double confirm) will
> > > > return for further enrichment. One the other hand, lateral join will
> > have
> > > > sub-queries correlated with every individual value of the reference
> > table
> > > > from the preceding part of the query and each sub query will return
> > > > multiple rows. If we use async calls with lateral join, my gut
> feeling
> > is
> > > > that we might have many more async calls than lookup join. I am not
> > > really
> > > > sure if we will be facing potential issues in this case or not.
> > Possible
> > > > issues I can think of now e.g. too many PRC calls, too many async
> calls
> > > > processing, the sub query will return a table which might be (too)
> big,
> > > and
> > > > might cause performance issues. I would suggest preparing some use
> > cases
> > > > and running some performance tests to check it. These are my concerns
> > > about
> > > > using async calls with lateral join and I'd like to share with you,
> > happy
> > > > to discuss with you and hear different opinions, hopefully the
> > > > discussion could help me understand it more deeply. Please correct me
> > if
> > > I
> > > > am wrong.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > >
> > > > 

[jira] [Created] (FLINK-32302) Disable Hbase 2.x tests on Java 17

2023-06-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32302:


 Summary: Disable Hbase 2.x tests on Java 17
 Key: FLINK-32302
 URL: https://issues.apache.org/jira/browse/FLINK-32302
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / HBase, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Lacking support on the HBase side. Version bumps may solve it, but that's out 
of scope of this issue since the connector is being externalized.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Aitozi
Hi Jing,

 The performance test is added to the FLIP.

 As I know, The lookup join can return multi rows, it depends on
whether  the join key
is the primary key of the external database or not. The `lookup` [1] will
return a collection of
joined result, and each of them will be collected


[1]:
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52


Thanks,
Aitozi.

Jing Ge  于2023年6月9日周五 17:05写道:

> Hi Aitozi,
>
> Thanks for the feedback. Looking forward to the performance tests.
>
> Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> lookup function is used to enrich column(s) from the dimension table. If,
> for the given key, there will be more than one row, there will be no way to
> know which row will be used to enrich the key.
>
> [1]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> [2]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
>
> Best regards,
> Jing
>
> On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
>
> > Hi Jing
> > Thanks for your good questions. I have updated the example to the
> FLIP.
> >
> > > Only one row for each lookup
> > lookup can also return multi rows, based on the query result. [1]
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> >
> > > If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not.
> >
> > IMO, the work pattern is similar to the lookup function, for each row
> from
> > the left table,
> > it will evaluate the eval method once, so the async call numbers will not
> > change.
> > and the maximum calls in flight is limited by the Async operators buffer
> > capacity
> > which will be controlled by the option.
> >
> > BTW, for the naming of these option, I updated the FLIP about this you
> can
> > refer to
> > the section of "ConfigOption" and "Rejected Alternatives"
> >
> > In the end, for the performance evaluation, I'd like to do some tests and
> > will update it to the FLIP doc
> >
> > Thanks,
> > Aitozi.
> >
> >
> > Jing Ge  于2023年6月9日周五 07:23写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the clarification. The code example looks interesting. I
> would
> > > suggest adding them into the FLIP. The description with code examples
> > will
> > > help readers understand the motivation and how to use it. Afaiac, it
> is a
> > > valid feature for Flink users.
> > >
> > > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> > AS
> > > OF which is also used in your code example. Temporal join performs the
> > > lookup based on the processing time match. Only one row for each
> > > lookup(afaiu, I need to check the source code to double confirm) will
> > > return for further enrichment. One the other hand, lateral join will
> have
> > > sub-queries correlated with every individual value of the reference
> table
> > > from the preceding part of the query and each sub query will return
> > > multiple rows. If we use async calls with lateral join, my gut feeling
> is
> > > that we might have many more async calls than lookup join. I am not
> > really
> > > sure if we will be facing potential issues in this case or not.
> Possible
> > > issues I can think of now e.g. too many PRC calls, too many async calls
> > > processing, the sub query will return a table which might be (too) big,
> > and
> > > might cause performance issues. I would suggest preparing some use
> cases
> > > and running some performance tests to check it. These are my concerns
> > about
> > > using async calls with lateral join and I'd like to share with you,
> happy
> > > to discuss with you and hear different opinions, hopefully the
> > > discussion could help me understand it more deeply. Please correct me
> if
> > I
> > > am wrong.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
> > >
> > > > Hi Mason,
> > > > Thanks for your input. I think if we support the user defined
> async
> > > > table function,
> > > > user will be able to use it to hold a batch data then handle it at
> one
> > > time
> > > > in the customized function.
> > > >
> > > > AsyncSink is meant for the sink operator. I have not figure out how
> to
> > > > integrate in this case.
> > > >
> > > > Thanks,
> > > > Atiozi.
> > > >
> > > >
> > > > Mason Chen  于2023年6月8日周四 12:40写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > I think it makes 

[DISCUSS] Visibility on HybridSource or KafkaSource with multiple topics

2023-06-09 Thread Or Keren
Hey all,



*Motivation:*

Currently when using either HybridSource with multiple sources or
KafkaSource with multiple topics inside, it’s impossible to see the
distribution between the different sources regarding how many records have
been received so far. All of the received records metric is being displayed
as if it’s one source.



*Proposal:*
For the HybridSource:

   - Collapsible visibility on which sources are included in it, and how
   many records were received by each one of them.
   - Indicator which one of them is the active source.


For the KafkaSource:

   - Collapsible visibility on which topics are included in it, and how
   many records were received by each one of them.


[jira] [Created] (FLINK-32301) common.sh#create_ha_config should use set_config_key

2023-06-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32301:


 Summary: common.sh#create_ha_config should use set_config_key
 Key: FLINK-32301
 URL: https://issues.apache.org/jira/browse/FLINK-32301
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Instead of replacing the entire configuration, set the desired individual 
options instead.
The current approach isn't great because it prevents us from setting required 
defaults in the flink-dist config.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32300) Support get object for result set

2023-06-09 Thread Fang Yong (Jira)
Fang Yong created FLINK-32300:
-

 Summary: Support get object for result set
 Key: FLINK-32300
 URL: https://issues.apache.org/jira/browse/FLINK-32300
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / JDBC
Affects Versions: 1.18.0
Reporter: Fang Yong


Support get object for result set



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-09 Thread Leonard Xu
About the semantics consideration, I have some new input after rethink.

1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following the 
syntax  `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME AS OF `

2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is pretty 
intuitive, for TIMESTAMP_type, it will be implied cast to TIMESTAMP_LTZ type by 
planner using session timezone and then pass to CatalogTable. This case can be 
considered as a Function AsOfSnapshot(Table t, TIMESTAMP_LTZ arg), which can 
pass arg with TIMESTAMP_LTZ type, but our framework supports implicit type 
conversion thus users can also pass arg with TIMESTAMP type. Hint, Spark[1] did 
the  implicit type conversion too.

3.I also considered handing over the implicit type conversion to the connector 
instead of planner, such as passing a TIMESTAMP literal, and the connector 
using the session timezone to perform type conversion, but this is more 
complicated than previous planner handling, and it’s not friendly to the 
connector developers.

4. The last point,  TIMESTAMP_LTZ  '1970-01-01 00:00:04.001’ should be an 
invalid expression as if you can not define a instant point (i.e TIMSTAMP_LTZ 
semantics in SQL) from a timestamp literal without timezone. You can use 
explicit type conversion like `cast(ts_ntz as TIMESTAMP_LTZ)` after `FOR 
SYSTEM_TIME AS OF ` if you want to use
Timestamp type/expression/literal without timezone.

5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST 
time[2] well that will help user avoid many corner case.


Best,
Leonard

[1]https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
[2]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support




> On Jun 9, 2023, at 1:13 PM, Benchao Li  wrote:
> 
> As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's
> where the time zone happens.
> 
> What I'm talking about is casting timestamp/timestamp_ltz to long directly,
> that's why the semantic is tricky when you are casting timestamp to long
> using time zone.
> 
> For other systems, such as SQL server[1], they actually uses a string
> instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
> 00:00:00.000'`, I'm not sure whether they convert the string implicitly
> to TIMESTAMP_LTZ, or they just have a different definition of the syntax.
> 
> But for us, we are definitely using timestamp/timestmap_ltz literal here,
> that's why it is special, and we must highlight this behavior that we are
> converting a timestamp without time zone literal to long using the session
> time zone.
> 
> [1]
> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16
> 
> Feng Jin  于2023年6月8日周四 11:35写道:
> 
>> Hi all,
>> 
>> thanks for your input
>> 
>> 
>> @Benchao
>> 
>>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
>> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC timezone,
>> which is not usually expected by users.
>> 
>> It was indeed the case before Flink 1.13, but now my understanding is that
>> there have been some slight changes in the definition of TIMESTAMP.
>> 
>> TIMESTAMP is currently used to specify the year, month, day, hour, minute
>> and second. We recommend that users use *UNIX_TIMESTAMP(CAST(timestamp_col
>> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
>> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
>> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will involve
>> using the *LOCAL TIME ZONE*.
>> 
>> 
>> Here is an test:
>> 
>> Flink SQL> SET 'table.local-time-zone' = 'UTC';
>> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
>> STRING)) as `timestamp`;
>> ---
>> timestamp
>> --
>> 0
>> 
>> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
>> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
>> STRING)) as `timestamp`;
>> ---
>> timestamp
>> --
>> -28800
>> 
>> Therefore, the current conversion method exposed to users is also using
>> LOCAL TIME ZONE.
>> 
>> 
>> @yuxia
>> 
>> Thank you very much for providing the list of behaviors of TIMESTAMP in
>> other systems.
>> 
>>> I think we can align them to avoid the inconsistency to other engines and
>> provide convenience for the external connectors while integrating Flink's
>> time travel API.
>> 
>> +1 for this.
>> 
>>> Regarding the inconsistency, I think we can consider time-travel as a
>> specical case, and we do needs to highlight this in this FLIP.
>> As for "violate the restriction outlined in FLINK-21978[1]", since we cast
>> timestamp to epochMillis only for the internal use, and won't expose it to
>> users, I don't think it will violate the restriction.

[jira] [Created] (FLINK-32299) Upload python jar when sql contains python udf jar

2023-06-09 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-32299:
-

 Summary: Upload python jar when sql contains python udf jar
 Key: FLINK-32299
 URL: https://issues.apache.org/jira/browse/FLINK-32299
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway, Table SQL / Runtime
Reporter: Shengkai Fang


Currently, sql gateway always uploads the python jar when submitting jobs. 
However, it's not required for every sql job. We should add the python jar into 
the PipelineOpitons.JARS only when user jobs contain python udf.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Status of Statefun Project

2023-06-09 Thread Marco Villalobos
Why can't the Apache Software Foundation allow community members to bring it up 
to date?

What's the process for that?

I believe that there are people and companies on this mailing list interested 
in supporting Apache Flink Stateful Functions.

You already had two people on this thread express interest.

At the very least, we could keep the library versions up to date.

There are only a small list of new features that might be worthwhile:

1. event time processing
2. state rest api


> On Jun 6, 2023, at 3:06 AM, Chesnay Schepler  wrote:
> 
> If you were to fork it and want to redistribute it then the short version is 
> that 
> you have to adhere to the Apache licensing requirements
> you have to make it clear that your fork does not belong to the Apache Flink 
> project. (Trademarks and all that)
> Neither should be significant hurdles (there should also be plenty of online 
> resources regarding 1), and if you do this then you can freely share your 
> fork with others.
> 
> I've also pinged Martijn to take a look at this thread.
> To my knowledge the project hasn't decided anything yet.
> 
> On 27/05/2023 04:05, Galen Warren wrote:
>> Ok, I get it. No interest.
>> 
>> If this project is being abandoned, I guess I'll work with my own fork. Is
>> there anything I should consider here? Can I share it with other people who
>> use this project?
>> 
>> On Tue, May 16, 2023 at 10:50 AM Galen Warren  
>> 
>> wrote:
>> 
>>> Hi Martijn, since you opened this discussion thread, I'm curious what your
>>> thoughts are in light of the responses? Thanks.
>>> 
>>> On Wed, Apr 19, 2023 at 1:21 PM Galen Warren  
>>> 
>>> wrote:
>>> 
 I use Apache Flink for stream processing, and StateFun as a hand-off
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.
 
 This is essentially how I use it as well, and I would also be sad to see
 it sunsetted. It works well; I don't know that there is a lot of new
 development required, but if there are no new Statefun releases, then
 Statefun can only be used with older Flink versions.
 
 On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
 mvillalo...@kineteque.com > wrote:
 
> I am currently using Stateful Functions in my application.
> 
> I use Apache Flink for stream processing, and StateFun as a hand-off
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.
> 
> I would be disappointed if StateFun was sunsetted.  Its a good idea.
> 
> If there is anything I can do to help, as a contributor perhaps, please
> let me know.
> 
>> On Apr 3, 2023, at 2:02 AM, Martijn Visser  
>> 
> wrote:
>> Hi everyone,
>> 
>> I want to open a discussion on the status of the Statefun Project [1]
> in Apache Flink. As you might have noticed, there hasn't been much
> development over the past months in the Statefun repository [2]. There is
> currently a lack of active contributors and committers who are able to 
> help
> with the maintenance of the project.
>> In order to improve the situation, we need to solve the lack of
> committers and the lack of contributors.
>> On the lack of committers:
>> 
>> 1. Ideally, there are some of the current Flink committers who have
> the bandwidth and can help with reviewing PRs and merging them.
>> 2. If that's not an option, it could be a consideration that current
> committers only approve and review PRs, that are approved by those who are
> willing to contribute to Statefun and if the CI passes
>> On the lack of contributors:
>> 
>> 3. Next to having this discussion on the Dev and User mailing list, we
> can also create a blog with a call for new contributors on the Flink
> project website, send out some tweets on the Flink / Statefun twitter
> accounts, post messages on Slack etc. In that message, we would inform how
> those that are interested in contributing can start and where they could
> reach out for more information.
>> There's also option 4. where a group of interested people would split
> Statefun from the Flink project and make it a separate top level project
> under the Apache Flink umbrella (similar as recently has happened with
> Flink Table Store, which has become Apache Paimon).
>> If we see no improvements in the coming period, we should consider
> sunsetting Statefun and communicate that clearly to the users.
>> I'm looking forward to your thoughts.
>> 
>> Best regards,
>> 
>> Martijn
>> 
>> [1] https://nightlies.apache.org/flink/flink-statefun-docs-master/ 
>> 

[jira] [Created] (FLINK-32298) The outputQueueSize is negative

2023-06-09 Thread Rui Fan (Jira)
Rui Fan created FLINK-32298:
---

 Summary: The outputQueueSize is negative 
 Key: FLINK-32298
 URL: https://issues.apache.org/jira/browse/FLINK-32298
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.18.0
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-06-09-17-27-46-429.png

h1. Backgraound

The outputQueueSize indicates `The real size of queued output buffers in 
bytes.`, so it shouldn't be negative. However, it may be negative in some cases.
h2. How outputQueueSize is generated?

TotalWrittenBytes: *_BufferWritingResultPartition#totalWrittenBytes_* records 
how many data is written to ResultPartition.

TotalSentNumberOfBytes: *_PipelinedSubpartition#totalNumberOfBytes_* records 
how many data is sent to downstream.

The outputQueueSize = TotalWrittenBytes - TotalSentNumberOfBytes.
h1. Bug

The TotalSentNumberOfBytes may be larger than TotalWrittenBytes due to some 
data are written to the PipelinedSubpartition without the 
BufferWritingResultPartition, such as : 
 # PipelinedSubpartition#finishReadRecoveredState writes the 
`EndOfChannelStateEvent` even if the unaligned checkpoint is disable
 # PipelinedSubpartition#addRecovered writes channel state(if the job recovered 
from unaligned checkpoint, the outputQueueSize is totally wrong)
 # PipelinedSubpartition#finish writes the `EndOfPartitionEvent`

!image-2023-06-09-17-27-46-429.png|width=1033,height=296!

 
h1. Solution

PipelinedSubpartition should is written through BufferWritingResultPartition, 
and all writes should be counted.

 

By the way, outputQueueSize doesn't matter because it's just a metric, it 
doesn't affect data processing. I found this bug because some of our flink 
scenarios need to use adaptive rebalance (FLINK-31655), I'm developing it in 
our internal version, which relies on the correct outputQueueSize to select the 
low pressure channel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-09 Thread Paul Lam
Hi Mason,

I get your point. I'm increasingly feeling the need to introduce a built-in
file distribution mechanism for flink-kubernetes module, just like Spark 
does with `spark.kubernetes.file.upload.path` [1].

I’m assuming the workflow is as follows:

- KubernetesClusterDescripter uploads all local resources to a remote
  storage via Flink filesystem (skips if the resources are already remote).
- KubernetesApplicationClusterEntrypoint downloads the resources
  and put them in the classpath during startup.

I wouldn't mind splitting it into another FLIP to ensure that everything is
done correctly.

cc'ed @Yang to gather more opinions.

[1] 
https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management

Best,
Paul Lam

> 2023年6月8日 12:15,Mason Chen  写道:
> 
> Hi Paul,
> 
> Thanks for your response!
> 
> I agree that utilizing SQL Drivers in Java applications is equally important
>> as employing them in SQL Gateway. WRT init containers, I think most
>> users use them just as a workaround. For example, wget a jar from the
>> maven repo.
>> 
>> We could implement the functionality in SQL Driver in a more graceful
>> way and the flink-supported filesystem approach seems to be a
>> good choice.
>> 
> 
> My main point is: can we solve the problem with a design agnostic of SQL
> and Stream API? I mentioned a use case where this ability is useful for
> Java or Stream API applications. Maybe this is even a non-goal to your FLIP
> since you are focusing on the driver entrypoint.
> 
> Jark mentioned some optimizations:
> 
>> This allows SQLGateway to leverage some metadata caching and UDF JAR
>> caching for better compiling performance.
>> 
> It would be great to see this even outside the SQLGateway (i.e. UDF JAR
> caching).
> 
> Best,
> Mason
> 
> On Wed, Jun 7, 2023 at 2:26 AM Shengkai Fang  wrote:
> 
>> Hi. Paul.  Thanks for your update and the update makes me understand the
>> design much better.
>> 
>> But I still have some questions about the FLIP.
>> 
>>> For SQL Gateway, only DMLs need to be delegated to the SQL server
>>> Driver. I would think about the details and update the FLIP. Do you have
>> some
>>> ideas already?
>> 
>> If the applicaiton mode can not support library mode, I think we should
>> only execute INSERT INTO and UPDATE/ DELETE statement in the application
>> mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
>> statements. The ANALYZE TABLE syntax need to register the statistic to the
>> catalog after job finishes and the CALL PROCEDURE statement doesn't
>> generate the ExecNodeGraph.
>> 
>> * Introduce storage via option `sql-gateway.application.storage-dir`
>> 
>> If we can not support to submit the jars through web submission, +1 to
>> introduce the options to upload the files. While I think the uploader
>> should be responsible to remove the uploaded jars. Can we remove the jars
>> if the job is running or gateway exits?
>> 
>> * JobID is not avaliable
>> 
>> Can we use the returned rest client by ApplicationDeployer to query the job
>> id? I am concerned that users don't know which job is related to the
>> submitted SQL.
>> 
>> * Do we need to introduce a new module named flink-table-sql-runner?
>> 
>> It seems we need to introduce a new module. Will the new module is
>> available in the distribution package? I agree with Jark that we don't need
>> to introduce this for table-API users and these users have their main
>> class. If we want to make users write the k8s operator more easily, I think
>> we should modify the k8s operator repo. If we don't need to support SQL
>> files, can we make this jar only visible in the sql-gateway like we do in
>> the planner loader?[1]
>> 
>> [1]
>> 
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95
>> 
>> Best,
>> Shengkai
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Weihua Hu  于2023年6月7日周三 10:52写道:
>> 
>>> Hi,
>>> 
>>> Thanks for updating the FLIP.
>>> 
>>> I have two cents on the distribution of SQLs and resources.
>>> 1. Should we support a common file distribution mechanism for k8s
>>> application mode?
>>>  I have seen some issues and requirements on the mailing list.
>>>  In our production environment, we implement the download command in the
>>> CliFrontend.
>>>  And automatically add an init container to the POD for file
>> downloading.
>>> The advantage of this
>>>  is that we can use all Flink-supported file systems to store files.
>>> 
>>>  This need more discussion. I would appreciate hearing more opinions.
>>> 
>>> 2. In this FLIP, we distribute files in two different ways in YARN and
>>> Kubernetes. Can we combine it in one way?
>>>  If we don't want to implement a common file distribution for k8s
>>> application mode. Could we use the SQLDriver
>>>  to download the files both in YARN and K8S? IMO, this can reduce the
>> cost
>>> of code maintenance.
>>> 
>>> Best,
>>> Weihua
>>> 
>>> 
>>> On Wed, Jun 

Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-06-09 Thread yuxia
Thanks all again. 
Then, I'll start to abandon the fall back behavior for Hive dialect in Flink 
1.18.

Best regards,
Yuxia

- 原始邮件 -
发件人: "yuxia" 
收件人: "dev" 
发送时间: 星期四, 2023年 6 月 08日 上午 11:53:32
主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

Thanks all for the discussion.
Since we reach the consensus about abandoning the fall back behavior for Hive 
dialect, if there's no other concern or objection, I'll close this discussion 
tomorrow(6/9).


Best regards,
Yuxia

- 原始邮件 -
发件人: "Martijn Visser" 
收件人: "dev" 
发送时间: 星期一, 2023年 6 月 05日 下午 8:46:14
主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

+1 for anything that helps us with externalizing the Hive connector :D

On Thu, Jun 1, 2023 at 3:34 PM Lincoln Lee  wrote:

> +1, thanks yuxia for driving the hive decoupling work!
> Since the 1.16 release, the compatibility of Hive queries has reached a
> relatively high level, so it is time to abandon the internal fallback,
> which will make the behavior of the Hive dialect clearer.
>
> Best,
> Lincoln Lee
>
>
> Jark Wu  于2023年6月1日周四 21:23写道:
>
> > +1, I think this can make the grammar more clear.
> > Please remember to add a release note once the issue is finished.
> >
> > Best,
> > Jark
> >
> > On Thu, 1 Jun 2023 at 11:28, yuxia  wrote:
> >
> > > Hi, Jingsong. It's hard to provide an option regarding to the fact that
> > we
> > > also want to decouple Hive with flink planner.
> > > If we still need this fall back behavior, we will still depend on
> > > `ParserImpl` provided by flink-table-planner  on HiveParser.
> > > But to try best to minimize the impact to users and more user-friendly,
> > > I'll remind users may use set table.sql-dialect = default to switch to
> > > Flink's default dialect in error message when fail to parse the sql in
> > > HiveParser.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Jingsong Li" 
> > > 收件人: "Rui Li" 
> > > 抄送: "dev" , "yuxia"  >,
> > > "User" 
> > > 发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56
> > > 主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default
> > > dialect
> > >
> > > +1, the fallback looks weird now, it is outdated.
> > >
> > > But, it is good to provide an option. I don't know if there are some
> > > users who depend on this fallback.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
> > > >
> > > > +1, the fallback was just intended as a temporary workaround to run
> > > catalog/module related statements with hive dialect.
> > > >
> > > > On Mon, May 29, 2023 at 3:59 PM Benchao Li 
> > wrote:
> > > >>
> > > >> Big +1 on this, thanks yuxia for driving this!
> > > >>
> > > >> yuxia  于2023年5月29日周一 14:55写道:
> > > >>
> > > >> > Hi, community.
> > > >> >
> > > >> > I want to start the discussion about Hive dialect shouldn't fall
> > back
> > > to
> > > >> > Flink's default dialect.
> > > >> >
> > > >> > Currently, when the HiveParser fail to parse the sql in Hive
> > dialect,
> > > >> > it'll fall back to Flink's default parser[1] to handle
> > flink-specific
> > > >> > statements like "CREATE CATALOG xx with (xx);".
> > > >> >
> > > >> > As I‘m involving with Hive dialect and have some communication
> with
> > > >> > community users who use Hive dialectrecently,  I'm thinking throw
> > > exception
> > > >> > directly instead of falling back to Flink's default dialect when
> > fail
> > > to
> > > >> > parse the sql in Hive dialect
> > > >> >
> > > >> > Here're some reasons:
> > > >> >
> > > >> > First of all, it'll hide some error with Hive dialect. For
> example,
> > we
> > > >> > found we can't use Hive dialect any more with Flink sql client in
> > > release
> > > >> > validation phase[2], finally we find a modification in Flink sql
> > > client
> > > >> > cause it, but our test case can't find it earlier for although
> > > HiveParser
> > > >> > faill to parse it but then it'll fall back to default parser and
> > pass
> > > test
> > > >> > case successfully.
> > > >> >
> > > >> > Second, conceptually, Hive dialect should be do nothing with
> Flink's
> > > >> > default dialect. They are two totally different dialect. If we do
> > > need a
> > > >> > dialect mixing Hive dialect and default dialect , may be we need
> to
> > > propose
> > > >> > a new hybrid dialect and announce the hybrid behavior to users.
> > > >> > Also, It made some users confused for the fallback behavior. The
> > fact
> > > >> > comes from I had been ask by community users. Throw an excpetioin
> > > directly
> > > >> > when fail to parse the sql statement in Hive dialect will be more
> > > intuitive.
> > > >> >
> > > >> > Last but not least, it's import to decouple Hive with Flink
> > planner[3]
> > > >> > before we can externalize Hive connector[4]. If we still fall back
> > to
> > > Flink
> > > >> > default dialct, then we will need depend on `ParserImpl` in Flink
> > > planner,
> > > >> > which will 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Jing Ge
Hi Aitozi,

Thanks for the feedback. Looking forward to the performance tests.

Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
lookup function is used to enrich column(s) from the dimension table. If,
for the given key, there will be more than one row, there will be no way to
know which row will be used to enrich the key.

[1]
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
[2]
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196

Best regards,
Jing

On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:

> Hi Jing
> Thanks for your good questions. I have updated the example to the FLIP.
>
> > Only one row for each lookup
> lookup can also return multi rows, based on the query result. [1]
>
> [1]:
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
>
> > If we use async calls with lateral join, my gut feeling is
> that we might have many more async calls than lookup join. I am not really
> sure if we will be facing potential issues in this case or not.
>
> IMO, the work pattern is similar to the lookup function, for each row from
> the left table,
> it will evaluate the eval method once, so the async call numbers will not
> change.
> and the maximum calls in flight is limited by the Async operators buffer
> capacity
> which will be controlled by the option.
>
> BTW, for the naming of these option, I updated the FLIP about this you can
> refer to
> the section of "ConfigOption" and "Rejected Alternatives"
>
> In the end, for the performance evaluation, I'd like to do some tests and
> will update it to the FLIP doc
>
> Thanks,
> Aitozi.
>
>
> Jing Ge  于2023年6月9日周五 07:23写道:
>
> > Hi Aitozi,
> >
> > Thanks for the clarification. The code example looks interesting. I would
> > suggest adding them into the FLIP. The description with code examples
> will
> > help readers understand the motivation and how to use it. Afaiac, it is a
> > valid feature for Flink users.
> >
> > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> AS
> > OF which is also used in your code example. Temporal join performs the
> > lookup based on the processing time match. Only one row for each
> > lookup(afaiu, I need to check the source code to double confirm) will
> > return for further enrichment. One the other hand, lateral join will have
> > sub-queries correlated with every individual value of the reference table
> > from the preceding part of the query and each sub query will return
> > multiple rows. If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not. Possible
> > issues I can think of now e.g. too many PRC calls, too many async calls
> > processing, the sub query will return a table which might be (too) big,
> and
> > might cause performance issues. I would suggest preparing some use cases
> > and running some performance tests to check it. These are my concerns
> about
> > using async calls with lateral join and I'd like to share with you, happy
> > to discuss with you and hear different opinions, hopefully the
> > discussion could help me understand it more deeply. Please correct me if
> I
> > am wrong.
> >
> > Best regards,
> > Jing
> >
> >
> > On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
> >
> > > Hi Mason,
> > > Thanks for your input. I think if we support the user defined async
> > > table function,
> > > user will be able to use it to hold a batch data then handle it at one
> > time
> > > in the customized function.
> > >
> > > AsyncSink is meant for the sink operator. I have not figure out how to
> > > integrate in this case.
> > >
> > > Thanks,
> > > Atiozi.
> > >
> > >
> > > Mason Chen  于2023年6月8日周四 12:40写道:
> > >
> > > > Hi Aitozi,
> > > >
> > > > I think it makes sense to make it easier for SQL users to make RPCs.
> Do
> > > you
> > > > think your proposal can extend to the ability to batch data for the
> > RPC?
> > > > This is also another common strategy to increase throughput. Also,
> have
> > > you
> > > > considered solving this a bit differently by leveraging Flink's
> > > AsyncSink?
> > > >
> > > > Best,
> > > > Mason
> > > >
> > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
> > > >
> > > > > One more thing for discussion:
> > > > >
> > > > > In our internal implementation, we reuse the option
> > > > > `table.exec.async-lookup.buffer-capacity` and
> > > > > `table.exec.async-lookup.timeout` to config
> > > > > the async udtf. Do you think we should add two extra option to
> > > > distinguish
> > > > > from the lookup option 

[jira] [Created] (FLINK-32297) Use Temurin image in FlinkImageBuilder

2023-06-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32297:


 Summary: Use Temurin image in FlinkImageBuilder
 Key: FLINK-32297
 URL: https://issues.apache.org/jira/browse/FLINK-32297
 Project: Flink
  Issue Type: Sub-task
  Components: Test Infrastructure
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


The FlinkImageBuilder currently uses openjdk images. I've seen issues with 
these on Java 17, and propose to use Temurin, similar to the prod images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32296) Flink SQL handle array of row incorrectly

2023-06-09 Thread Lim Qing Wei (Jira)
Lim Qing Wei created FLINK-32296:


 Summary: Flink SQL handle array of row incorrectly
 Key: FLINK-32296
 URL: https://issues.apache.org/jira/browse/FLINK-32296
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.16.2, 1.15.3
Reporter: Lim Qing Wei


FlinkSQL produce incorrect result when involving data with type of ARRAY, 
here's a reproduction:

 

 
{code:java}
CREATE TEMPORARY VIEW bug_data as (
SELECT CAST(ARRAY[
(10, '2020-01-10'), (101, '244ddf'), (1011, '2asdfaf'), (1110, '200'), (2210, 
'20-01-10'), (4410, '2')
] AS ARRAY>)
UNION
SELECT CAST(ARRAY[
(10, '2020-01-10'), (121, '244ddf'), (, '2asdfaf'), (32243, '200'), (2210, 
'3-01-10'), (4410, '23243243')
] AS ARRAY>)
UNION SELECT CAST(ARRAY[
(10, '2020-01-10'), (222, '244ddf'), (1011, '2asdfaf'), (1110, '200'), (24367, 
'20-01-10'), (4410, '2')
] AS ARRAY>)
UNION SELECT CAST(ARRAY[
(10, '2020-01-10'), (5666, '244ddf'), (435243, '2asdfaf'), (56567, '200'), 
(2210, '20-01-10'), (4410, '2')
] AS ARRAY>)
UNION SELECT CAST(ARRAY[
(10, '2020-01-10'), (43543, '244ddf'), (1011, '2asdfaf'), (1110, '200'), 
(8967564, '20-01-10'), (4410, '2')
] AS ARRAY>)
);

CREATE TABLE sink (
r ARRAY>
) WITH ('connector' = 'print'); {code}
 

 

In both 1.15 and 1.16, it produces the following:

 
{noformat}
[+I[4410, 2], +I[4410, 2], +I[4410, 2], +I[4410, 2], +I[4410, 
2], +I[4410, 2]]

[+I[4410, 23243243], +I[4410, 23243243], +I[4410, 23243243], +I[4410, 
23243243], +I[4410, 23243243], +I[4410, 23243243]]{noformat}
 

 

I think this is unexpected/wrong because:
 # The query should produce 5 rows, not 2
 # The data is also wrong, noticed it just make every row in the array the 
same, but the input are not the same.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32295) Try out Infra-provided Gradle Enterprise

2023-06-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32295:


 Summary: Try out Infra-provided Gradle Enterprise
 Key: FLINK-32295
 URL: https://issues.apache.org/jira/browse/FLINK-32295
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System / CI
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


Infra has a Gradle Enterprise instance that can be used for Github Action 
branch builds (not PRs). We could try this out in one of the connector repos to 
see if it provides value to us; if so rolling this out to all 
connector/auxiliary repos could be interesting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-09 Thread liu ron
Hi, Mang

Thanks for your update, the FLIP looks good to me now.

Best,
Ron

Mang Zhang  于2023年6月9日周五 12:08写道:

> Hi Ron,
> Thanks for your reply!
> After our offline discussion, at present, there may be many of flink jobs
> using non-atomic CTAS functions, especially Stream jobs,
> If we only infer whether atomic CTAS is supported based on whether
> DynamicTableSink implements the SupportsStaging interface,
> then after the user upgrades to a new version, flink's behavior will
> change, which is not production friendly.
> in order to ensure the consistency of flink behavior, and to give the user
> maximum flexibility,
> in time DynamicTableSink implements the SupportsStaging interface, users
> can still choose non-atomic implementation according to business needs.
>
> I have updated FLIP-305[1].
>
> Looking forward to more feedback, if there is no other feedback, I will
> launch a vote next Monday(2023-06-12).
> Thanks again!
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-06-09 10:23:21, "liu ron"  wrote:
> >Hi, Mang
> >
> >In FLIP-214, we have discussed that atomicity is not needed in streaming
> >mode, so we have implemented the initial version that doesn't support
> >atomicity. In addition, we introduce the option
> >"table.ctas.atomicity-enabled" to enable the atomic ability. According to
> >your FLIP-315 description, Once the DynamicTableSink implements the
> >SupportsStaging interface, the atomicity is the default behavior whether in
> >stream mode or batch mode, and the user can't change it, I think this is
> >not feasible for streaming mode, the atomicity should can be controlled by
> >user. So I think we should clear the atomicity behavior combine the option
> >and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
> >the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?
> >
> >Best,
> >Ron
> >
> >Jark Wu  于2023年6月8日周四 16:30写道:
> >
> >> Thank you for the great work, Mang! The updated proposal looks good to me.
> >>
> >> Best,
> >> Jark
> >>
> >> > 2023年6月8日 11:49,Jingsong Li  写道:
> >> >
> >> > Thanks Mang for updating!
> >> >
> >> > Looks good to me!
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang  wrote:
> >> >>
> >> >> Hi Jingsong,
> >> >>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >> On this issue, we introduce the executable logic commit/abort a bit of
> >> strange on CatalogTable.
> >> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
> >> scenario.
> >> >> The new solution is similar to the implementation of SupportsOverwrite,
> >> >> which introduces the SupportsStaging interface and infers whether
> >> DynamicTableSink supports atomic ctas based on whether it implements the
> >> SupportsStaging interface,
> >> >> and if so, it will get the StagedTable object from DynamicTableSink.
> >> >>
> >> >> For more implementation details, please see the FLIP-305 document.
> >> >>
> >> >> This is my poc commits
> >> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
> >> >>
> >> >>
> >> >> [1]
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Best regards,
> >> >>
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >> At 2023-05-12 13:02:14, "Jingsong Li"  wrote:
> >> >>> Hi Mang,
> >> >>>
> >> >>> Thanks for starting this FLIP.
> >> >>>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >>>
> >> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >> >>>
> >> >>> Best,
> >> >>> Jingsong
> >> >>>
> >> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang  wrote:
> >> 
> >>  Hi Ron,
> >> 
> >> 
> >>  First of all, thank you for your reply!
> >>  After our offline communication, what you said is mainly in the
> >> compilePlan scenario, but currently compilePlanSql does not support non
> >> INSERT statements, otherwise it will throw an exception.
> >> > Unsupported SQL query! compilePlanSql() only accepts a single SQL
> >> statement of type INSERT
> >>  But it's a good point that I will seriously consider.
> >>  Non-atomic CTAS can be supported relatively easily;
> >>  But atomic CTAS needs more adaptation work, so I'm going to leave it
> >> as is and follow up with a 

Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-09 Thread Hang Ruan
Hi, Feng.

I reviewed this FLIP again and found some little places that may need to
optimize.

1. `CatalogDescriptor` may need a private constructor.
2. The method `initCatalog` in `CatalogManager` is not introduced.

Best,
Hang

Feng Jin  于2023年6月6日周二 21:17写道:

> Hi Leonard,
>
> Thanks for your reply.
>
> > 1. a  How to construct a CatalogDescriptor ?
>
> I think it would be helpful to add a method for constructing a
> CatalogDescriptor, as you mentioned in 1.c. I will update the documentation
> later.
>
> > 1.b  How to visit the fields ? Could we use Configuration instead of
> Map ?
>
> I believe that the use of Map options is only intended for
> creating a catalog and not for accessing internal parameters.
>
> Since all of the relevant parameters for CREATE CATALOG are also stored in
> Map options, my understanding is that using Map String> options should suffice.
>
> Here is the implementation of execute CREATE CATALOG statement.
> ```java
> private TableResultInternal createCatalog(CreateCatalogOperation operation)
> {
> String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
> try {
> String catalogName = operation.getCatalogName();
> Map properties = operation.getProperties();
>
> Catalog catalog =
> FactoryUtil.createCatalog(
> catalogName,
> properties,
> tableConfig,
> resourceManager.getUserClassLoader());
> catalogManager.registerCatalog(catalogName, catalog);
>
> return TableResultImpl.TABLE_RESULT_OK;
> } catch (CatalogException e) {
> throw new ValidationException(exMsg, e);
> }
> }
> ```
>
>
> >  2. Do we have plan to offer a default CatalogStore if user didn’t config
> this?
>
> Yes, the in-memory catalogStore will be used as the default CatalogStore
> even if the user has not configured one
>
>
> Best,
> Feng
>
>
> On Tue, Jun 6, 2023 at 8:02 PM Leonard Xu  wrote:
>
> > Hi, Feng
> >
> > Sorry for reply late, but I’ve some comments about the FLIP
> >
> >
> > 1. The introduced Public class CatalogDescriptor seems missed some
> > necessary component
> >   a) How to construct a CatalogDescriptor ?
> >   b) How to visit the fields ? Could we use Configuration instead of
> > Map ?
> >   c) Could we offer a built-in factory method to build a
> CatalogDescriptor
> > like
> >  public static CatalogDescriptor of(String catalogName, Configuration
> > configuration)
> >
> > 2. The FLIP said “By default, there are two built-in CatalogStores
> > available: the In-Memory CatalogStore and the File CatalogStore” ,
> > Do we have plan to offer a default CatalogStore if user didn’t config
> > this? IIUC, users can obtains the benefits  from lazy catalog
> > initialization If
> > we have a default catalogstore even it is in-memory.
> >
> > Best,
> > Leonard
> >
> >
> >
> > > On Jun 6, 2023, at 7:08 PM, Feng Jin  wrote:
> > >
> > > Hi everyone,
> > >
> > > Thank you everyone for your valuable input. If there are no further
> > > questions or concerns about the FLIP[1], I would like to start voting
> > > tomorrow (6/7).
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > >
> > > Best,
> > > Feng
> > >
> > >
> > > On Sun, Jun 4, 2023 at 3:33 PM Feng Jin  wrote:
> > >
> > >> Hi Samrat,
> > >>
> > >> Thanks for your advice.
> > >>
> > >>> 1. The createCatalog method does not mention any exceptions being
> > >> thrown.
> > >>
> > >> CreateCatalog will throw CatalogException like registerCatalog.  As
> > >> CatalogException is a RuntimeException,
> > >> there is no explicit declaration of throwing Exceptions in
> > CatalogManager
> > >> and TableEnvironment.
> > >> To avoid misunderstandings, I have added the "throw CatalogException"
> > flag
> > >> to the createCatalog method definition of CatalogStore.
> > >>
> > >>> 2. Could you please provide an exhaustive list of the supported
> kinds?
> > >>
> > >> Sure,  the documentation now includes the configuration of the
> built-in
> > >> CatalogStore as well as how to configure a custom CatalogStore.
> > >>
> > >>
> > >> Best,
> > >> Feng
> > >>
> > >>
> > >> On Sun, Jun 4, 2023 at 4:23 AM Samrat Deb 
> > wrote:
> > >>
> > >>> Hi Feng,
> > >>>
> > >>> Thank you for providing the proposal. I believe this feature will be
> > >>> highly
> > >>> valuable.
> > >>>
> > >>> I have a couple of inquiries:
> > >>>
> > >>> 1. According to the documentation [1], the createCatalog method does
> > not
> > >>> mention any exceptions being thrown. However, I would like to confirm
> > if
> > >>> it
> > >>> is always true that there will be no failures in all scenarios.
> Please
> > let
> > >>> me know if there is any additional information I may have missed.
> > >>>
> > >>> 2. Regarding the registration process using the
> > `table.catalog-store.kind`
> > >>> configuration, could you please provide an exhaustive list of the
> > >>> supported
> > >>> kinds?
> > >>>   It would be great to have a comprehensive understanding of the
> >