[jira] [Created] (FLINK-32459) Force set the parallelism of SocketTableSource to 1

2023-06-27 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-32459:
--

 Summary: Force set the parallelism of SocketTableSource to 1
 Key: FLINK-32459
 URL: https://issues.apache.org/jira/browse/FLINK-32459
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.18.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.18.0


SocketSource can only work with parallelism of 1, It is best to force set it 
when load it in DynamicTableSource.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Flink Mongodb Sink - Issues

2023-06-27 Thread Leonard Xu
Hi Harish

Jiabao has helped troubleshoot the issue[1] and fixed it very efficiently less 
than 24 hours, Thanks Jiabao!

You can built mongodb connector base on latest main branch, or you can wait the 
next connector release.


Best,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-32446 




> On Jun 27, 2023, at 7:24 PM, Jiabao Sun  
> wrote:
> 
> Hi Harish,
> 
> Thanks to report this issue. There are currently 5 ways to write:
> 
> 1. Flush only on checkpoint
> 'sink.buffer-flush.interval' = '-1' and 'sink.buffer-flush.max-rows' = '-1'
> 
> 2. Flush for for every single element
> 'sink.buffer-flush.interval' = '0' or 'sink.buffer-flush.max-rows' = '1'
> 
> 3. Flush when time interval exceed limit
> 'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' = '-1'
> 
> 4. Flush when batch size exceed limit
> 'sink.buffer-flush.max-rows' > '1' and 'sink.buffer-flush.interval' = '-1'
> 
> 5. Flush when time interval or batch size exceed limit. 
> 'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' > 1
> 
> The default write mode is 5 with 'sink.buffer-flush.interval' = '1s' and 
> 'sink.buffer-flush.max-rows' = '1000'.
> Whether the data sinks to mongodb is judged at the moment of checkpoint or 
> the next record comes in.
> Currently, there is no separate thread to regularly check whether the 
> interval between writing the last record exceeds the limit. 
> Therefore, when you do not enable checkpoint for writing, delayed writing 
> will occur.
> 
> You can try to enable checkpoint or use single data writing mode (mode 2) to 
> avoid this problem.
> 
> 
> Best,
> Jiabao
> 
> 
> 
>> 2023年6月27日 下午6:00, 
>>  写道:
>> 
>> Hi,
>> 
>> I am using the flink version 1.7.1 and flink-mongodb-sql-connector version 
>> 1.0.1-1.17.
>> 
>> Below is the data pipeline flow.
>> 
>> Source 1 (Kafka topic using Kafka connector) -> Window Aggregation  (legacy 
>> grouped window aggregation) -> Sink (Kafka topic using upsert-kafka 
>> connector) -> Sink(Mongdb collection).
>> 
>> 
>> I noticed a couple of issues with the mongodb-sink connector.
>> 
>> Issue 1: Mongo sink delays event write as soon as a tombstone message is 
>> received.
>> 
>> When a new key comes in, the aggregation is made and the result is written 
>> successfully to kafka topic and also to mongo db immediately.
>> 
>> Another new key comes in, the aggregation is made, the result is available 
>> in mongodb.
>> 
>> But when a key repeats for the first time, the aggregation is made, the 
>> results are written to kafka topic. You get 1 delete record and 1 latest 
>> record for the key in the topic.  The data for the key is deleted in the 
>> mongodb but, the latest record for the key is not inserted to mongodb.
>> 
>> When a new key or another key comes in, the previous record latest key is 
>> inserted to mongo db.
>> 
>> The same pattern exists for subsequent records.
>> 
>> There is always a delay of one event as soon as a tombstone record is found 
>> by the connector.
>> 
>> Issue 2: Mongo sink waits for new record to write previous records.
>> 
>> I have a upsert-kafka topic filled that has already some events.
>> I start a new upsert-kafka to mongo db sink job.
>> 
>> I expect all the data from the topic to be loaded to mongodb right away.
>> But instead, only the first record is written to mongo db.
>> The rest of the records don’t arrive in mongodb  until a  new event is 
>> written to kafka topic.
>> The new event that was written is delayed until the next event arrives.
>> 
>> I am not able to understand this behaviour.
>> This doesn’t feel like an expected behaviour.
>> Can someone please advice if I am missing something or an issue exists.
>> 
>> Regards,
>> Harish.
>> 
>> 
>> 
>> 
>> 
> 



[jira] [Created] (FLINK-32458) support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with other aggregate functions

2023-06-27 Thread lincoln lee (Jira)
lincoln lee created FLINK-32458:
---

 Summary: support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with 
other aggregate functions
 Key: FLINK-32458
 URL: https://issues.apache.org/jira/browse/FLINK-32458
 Project: Flink
  Issue Type: Sub-task
Reporter: lincoln lee






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32457) update current documentation of JSON_OBJECTAGG/JSON_ARRAYAGG to clarify the limitation

2023-06-27 Thread lincoln lee (Jira)
lincoln lee created FLINK-32457:
---

 Summary: update current documentation of 
JSON_OBJECTAGG/JSON_ARRAYAGG to clarify the limitation
 Key: FLINK-32457
 URL: https://issues.apache.org/jira/browse/FLINK-32457
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: lincoln lee






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32456) JSON_OBJECTAGG & JSON_ARRAYAGG cannot be used with other aggregate functions

2023-06-27 Thread lincoln lee (Jira)
lincoln lee created FLINK-32456:
---

 Summary: JSON_OBJECTAGG & JSON_ARRAYAGG cannot be used with other 
aggregate functions
 Key: FLINK-32456
 URL: https://issues.apache.org/jira/browse/FLINK-32456
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.17.1, 1.16.2, 1.15.4
Reporter: lincoln lee
Assignee: lincoln lee
 Attachments: image-2023-06-28-11-32-49-862.png

FLINK-16205 &  FLINK-16206 added the support for the new aggregate functions: 
JSON_OBJECTAGG & JSON_ARRAYAGG, but has a limitation (which is not documented 
yet) that cannot be used with other aggregate functions, e.g.,
{code}
SELECT f0, count(f1), sum(f2), JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0
{code}
will raise an internal AssertionError
{code}
java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(INTEGER f0, BIGINT NOT NULL EXPR$1, INTEGER EXPR$2, 
VARCHAR(2000) CHARACTER SET "UTF-16LE" NOT NULL EXPR$3) NOT NULL
expression type is RecordType(INTEGER f0, BIGINT NOT NULL EXPR$1) NOT NULL
set is rel#25:LogicalAggregate.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#24,group={0},EXPR$1=COUNT($1),EXPR$2=SUM($2),EXPR$3=JSON_OBJECTAGG_NULL_ON_NULL($1,
 $0))
expression is LogicalAggregate(group=[{0}], EXPR$1=[COUNT($3)])
  LogicalProject(f0=[$0], f1=[$1], f2=[$2], $f3=[JSON_STRING($1)])
LogicalTableScan(table=[[default_catalog, default_database, T]])
{code} 
because the implementation rule only supports single json agg function

an example case runnning on postgresql:
{code}
select b, json_object_agg(a, c), count(*), sum(a) from t1 group by b;
{code}

 !image-2023-06-28-11-32-49-862.png! 

The following improvements include two parts:
1. update current documentation to clarify the limitation
2. expand the implementation to support mixed use with other aggregate 
functions 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

2023-06-27 Thread Shammon FY
Hi Feng,

Thanks for your input.

>1. we can add a lineage interface like `supportReportLineage`

It's a so good idea and thanks very much. It can help users to report
lineage for existing connectors in DataStream jobs without any additional
operations. I will give this interface in the FLIP later and please help to
review it, thanks

>2. it is relatively easy to obtain column lineage through Calcite
MetaQuery API

It's helpful if Calcite already has some column lineage in meta, I think we
can discuss and give the proposal in the column lineage FLIP

Best,
Shammon FY



On Wednesday, June 28, 2023, Feng Jin  wrote:

> Hi Shammon
> Thank you for proposing this FLIP. I think the Flink Job lineage is a very
> useful feature.
> I have few question:
>
> 1. For DataStream Jobs, users need to set up lineage relationships when
> building DAGs for their custom sources and sinks.
> However, for some common connectors such as Kafka Connector and JDBC
> Connector, we can add a lineage interface like `supportReportLineage`, so
> that these connectors can implement it.
> This way, in the scenario of DataStream Jobs, lineages can be automatically
> reported. What do you think?
>
>
> 2. From the current design, it seems that we need to analyze column lineage
> through pipeline. As far as I know, it is relatively easy to obtain column
> lineage through Calcite MetaQuery API.
> Would you consider using this approach? Or do we need to implement another
> parsing process based on the pipeline?
> ```
> RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery();
> metadataQuery.getColumnOrigins(inputRel, i);
> ```
> Best,
> Feng
>
>
> On Sun, Jun 25, 2023 at 8:06 PM Shammon FY  wrote:
>
> > Hi yuxia and Yun,
> >
> > Thanks for your input.
> >
> > For yuxia:
> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
> >
> > At present, we only need to notify the listener when a job goes to
> > termination, but I think it makes sense to add generic `oldStatus` and
> > `newStatus` in the listener and users can update the job state in their
> > service as needed.
> >
> > > 2: I'm really confused about the `config()` included in
> `LineageEntity`,
> > where is it from and what is it for ?
> >
> > The `config` in `LineageEntity` is used for users to get options for
> source
> > and sink connectors. As the examples in the FLIP, users can add
> > server/group/topic information in the config for kafka and create lineage
> > entities for `DataStream` jobs, then the listeners can get this
> information
> > to identify the same connector in different jobs. Otherwise, the `config`
> > in `TableLineageEntity` will be the same as `getOptions` in
> > `CatalogBaseTable`.
> >
> > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
> is
> > needed or not, since `TableSinkLineageEntity` contains
> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > changelogmode?
> >
> > At present, we do not actually use the changelog mode. It can be deleted,
> > and I have updated FLIP.
> >
> > > Btw, since there're a lot interfaces proposed, I think it'll be better
> to
> > give an example about how to implement a listener in this FLIP to make us
> > know better about the interfaces.
> >
> > I have added the example in the FLIP and the related interfaces and
> > examples are in branch [1].
> >
> > For Yun:
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP does not touch them, and will them become part of the
> > List sources() or adding another interface?
> >
> > You're right, currently lookup join dim tables were not considered in the
> > 'proposed changed' section of this FLIP. But the interface for lineage is
> > universal and we can give `TableLookupSourceLineageEntity` which
> implements
> > `TableSourceLineageEntity` in the future without modifying the public
> > interface.
> >
> > > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> > The lineage information in SQL jobs includes table lineage and column
> > lineage. Although SQL jobs currently do not support column lineage, we
> > would like to support this in the next step. So we have comprehensively
> > considered the table lineage and column lineage interfaces here, and
> > defined these two interfaces together clearly
> >
> >
> > [1]
> >
> > https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b1
> 1e82c9187c
> >
> > Best,
> > Shammon FY
> >
> >
> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang  wrote:
> >
> > > Hi Shammon,
> > >
> > > I like the idea in general and it will help to analysis the job
> lineages
> > > no matter FlinkSQL or Flink jar jobs in production environments.
> > >
> > > For Qingsheng's concern, I'd like the name of JobType more than
> > > RuntimeExecutionMode, as the latter one is not easy to understand for
> > users.
> > >
> > > I 

[jira] [Created] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-32455:
---

 Summary: Breaking change in TypeSerializerUpgradeTestBase prevents 
flink-connector-kafka from building against 1.18-SNAPSHOT
 Key: FLINK-32455
 URL: https://issues.apache.org/jira/browse/FLINK-32455
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka, Test Infrastructure
Affects Versions: 1.18.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.18.0


FLINK-27518 introduced a breaking signature change to the abstract class 
{{TypeSerializerUpgradeTestBase}}, specifically the abstract 
{{createTestSpecifications}} method signature was changed. This breaks 
downstream test code in externalized connector repos, e.g. 
flink-connector-kafka's {{KafkaSerializerUpgradeTest}}

Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32454) deserializeStreamStateHandle of checkpoint read byte

2023-06-27 Thread Bo Cui (Jira)
Bo Cui created FLINK-32454:
--

 Summary: deserializeStreamStateHandle of checkpoint read byte
 Key: FLINK-32454
 URL: https://issues.apache.org/jira/browse/FLINK-32454
 Project: Flink
  Issue Type: Bug
Reporter: Bo Cui


during checkpoint deserialization,  deserializeStreamStateHandle shold read 
byte instead of int

https://github.com/apache/flink/blob/c5acd8dd800dfcd2c8873c569d0028fc7d991b1c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L712



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-32453:
---

 Summary: flink-connector-kafka does not build against Flink 
1.18-SNAPSHOT
 Key: FLINK-32453
 URL: https://issues.apache.org/jira/browse/FLINK-32453
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.18.0


There are a few breaking changes in test utility code that prevents 
{{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
This umbrella ticket captures all breaking changes, and should only be closed 
once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Feature requests for Flink protobuf deserialization

2023-06-27 Thread Adam Richardson
Hi there,

My company is in the process of rebuilding some of our batch Spark-based
ETL pipelines in Flink. We use protobuf to define our schemas. One major
challenge is that Flink protobuf deserialization has some semantic
differences with the ScalaPB encoders we use in our Spark systems. This
poses a serious barrier for adoption as moving any given dataset from Spark
to Flink will potentially break all downstream consumers. I have a long
list of feature requests in this area:

   1. Support for mapping protobuf optional wrapper types (StringValue,
   IntValue, and friends) to nullable primitive types rather than RowTypes
   2. Support for mapping the protobuf Timestamp type to a real timestamp
   rather than RowType
   3. A way of defining custom mappings from specific proto types to custom
   Flink types (the previous two feature requests could be implemented on top
   of this lower-level feature)
   4. Support for nullability semantics for message types (in the status
   quo, an unset message is treated as equivalent to a message with default
   values for all fields, which is a confusing user experience)
   5. Support for nullability semantics for primitives types (in many of
   our services, the default value for a field of primitive type is treated as
   being equivalent to unset or null, so it would be good to offer this as a
   capability in the data warehouse)

Would Flink accept patches for any or all of these feature requests? We're
contemplating forking flink-protobuf internally, but it would be better if
we could just upstream the relevant changes. (To my mind, 1, 2, and 4 are
broadly applicable features that are definitely worthy of upstream support.
3 and 5 may be somewhat more specific to our use case.)

Thanks,
Adam Richardson


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-27 Thread Martijn Visser
Hi all,

Thanks for the lively and good discussion. Given the length of the
discussion, I skimmed through and then did a deep dive on the latest state
of the FLIP. I think the FLIP is overall in a good state and ready to bring
to a vote.

One thing that I did notice while skimming through the discussions is that
I think there are some follow-ups that could be worthy of a FLIP and a
discussion. For example, if/what/how does the Flink community offer any
behavioral stability guarantees, or other types of expectations and
guarantees. I also do think that we must have tooling in place to implement
this FLIP (and also FLIP-196 and FLIP-197), to avoid that we're not
creating a policy on paper, but also have the means to enforce it. Last but
not least, I echo Jark's point that we can't estimate maintenance cost with
a concrete design and code/POC. For me, that means that a contributor can
choose to propose a deviation, but that the contributor would need to
explicitly mention it in the FLIP and get it discussed/voted on as part of
the FLIP process. But the starting point is as defined in this and other
relevant FLIPs.

Best regards,

Martijn

On Tue, Jun 27, 2023 at 3:38 AM Becket Qin  wrote:

> Hi Xintong, Jark and Jing,
>
> Thanks for the reply. Yes, we can only mark the DataStream API as
> @Deprecated after the ProcessFunction API is fully functional and mature.
>
> It is a fair point that the condition of marking a @Public API as
> deprecated should also be a part of this FLIP. I just added that to the
> FLIP wiki. This is probably more of a clarification on the existing
> convention, rather than a change.
>
> It looks like we are on the same page now for this FLIP. If so, I'll start
> a VOTE thread in two days.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Jun 26, 2023 at 8:09 PM Xintong Song 
> wrote:
>
> > >
> > > Considering DataStream API is the most fundamental and complex API of
> > > Flink, I think it is worth a longer time than the general process for
> the
> > > deprecation period to wait for the new API be mature.
> > >
> >
> > This inspires me. In this specific case, compared to how long should
> > DataStream API be removed after deprecation, it's probably more important
> > to answer the question how long would ProcessFunction API become mature
> and
> > stable after being introduced. According to FLIP-197[1], it requires 4
> > minor releases by default to promote an @Experimental API to @Public. And
> > for ProcessFunction API, which aims to replace DataStream API as one of
> the
> > most fundamental API of Flink, I'd expect this to take at least the
> default
> > time, or even longer. And we probably should wait until we believe
> > ProcessFunction API is stable to mark DataStream API as deprecated,
> rather
> > than as soon as it's introduced. Assuming we introduce the
> ProcessFunction
> > API in 2.0, that means we would need to wait for 6 minor releases (4 for
> > the new API to become stable, and 2 for the migration period) to remove
> > DataStream API, which is ~2.5 year (assuming 5 months / minor release),
> > which sounds acceptable for another major version bump.
> >
> > To wrap things up, it seems to me, sadly, that anyway we cannot avoid the
> > overhead for maintaining both DataStream & ProcessFunction APIs for at
> > least 6 minor releases.
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> >
> >
> >
> > On Mon, Jun 26, 2023 at 5:41 PM Jing Ge 
> > wrote:
> >
> > > Hi all,
> > >
> > > Just want to make sure we are on the same page. There is another
> > example[1]
> > > I was aware of recently that shows why more factors need to be taken
> care
> > > of than just the migration period. Thanks Galen for your hint.
> > >
> > > To put it simply, the concern about API deprecation is not that
> > deprecated
> > > APIs have been removed too early (min migration period is required).
> The
> > > major concern is that APIs are marked as deprecated for a (too) long
> > time,
> > > much longer than the migration period discussed in this thread, afaik.
> > > Since there is no clear picture/definition, no one knows when to do the
> > > migration for users(after the migration period has expired) and when to
> > > remove deprecated APIs for Flink developers.
> > >
> > > Based on all the information I knew, there are two kinds of obstacles
> > that
> > > will and should block the deprecation process:
> > >
> > > 1. Lack of functionalities in new APIs. It happens e.g. with the
> > > SourceFunction to FLIP-27 Source migration. Users who rely on those
> > > functions can not migrate to new APIs.
> > > 2. new APIs have critical bugs. An example could be found at [1]. Users
> > > have to stick to the deprecated APIs.
> > >
> > > Since FLIP-321 is focusing on the API deprecation process, those
> blocking
> > > issues deserve attention and should be put into the FLIP. The current
> > FLIP
> > > seems 

[ANNOUNCE] Kafka connector code removed from apache/master

2023-06-27 Thread Mason Chen
Hi all,

I would like to inform you that we have removed the Kafka connector code
from the Flink main repo. This should reduce the developer confusion of
which repo to submit PRs.

Regarding a few nuances, we have kept the Confluent avro format in the main
repo. This is because the format is actually connector agnostic. Also, to
unblock the code removal for the 1.18 release, we have pinned the connector
versions in tests/examples, and these are the followup items [1][2][3] to
refactor the main repo code because the Kafka connector dependency is not
required in these cases.

A big thanks to Gordon, Martjin, and Chesnay who helped review the work
from the beginning to the end!

[1] https://issues.apache.org/jira/browse/FLINK-32449
[2] https://issues.apache.org/jira/browse/FLINK-32451
[3] https://issues.apache.org/jira/browse/FLINK-32452

Best,
Mason


[jira] [Created] (FLINK-32452) Refactor SQL Client E2E Test to Remove Kafka SQL Connector Dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32452:
--

 Summary: Refactor SQL Client E2E Test to Remove Kafka SQL 
Connector Dependency
 Key: FLINK-32452
 URL: https://issues.apache.org/jira/browse/FLINK-32452
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Client, Tests
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. The E2E sql client test can use a different 
connector to exercise this test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32451) Refactor Confluent Schema Registry E2E Tests to remove Kafka connector dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32451:
--

 Summary: Refactor Confluent Schema Registry E2E Tests to remove 
Kafka connector dependency
 Key: FLINK-32451
 URL: https://issues.apache.org/jira/browse/FLINK-32451
 Project: Flink
  Issue Type: Technical Debt
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Tests
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. We can use a different connector to test the 
confluent schema registry format since the format is connector agnostic. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32450) Update Kafka CI setup to latest version for PRs and nightly builds

2023-06-27 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32450:
--

 Summary: Update Kafka CI setup to latest version for PRs and 
nightly builds
 Key: FLINK-32450
 URL: https://issues.apache.org/jira/browse/FLINK-32450
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32449) Refactor state machine examples to remove Kafka dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32449:
--

 Summary: Refactor state machine examples to remove Kafka dependency
 Key: FLINK-32449
 URL: https://issues.apache.org/jira/browse/FLINK-32449
 Project: Flink
  Issue Type: Technical Debt
  Components: Documentation
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. In this case, we should replace the 
KafkaSource with a example specific generator source, also deleting the 
KafkaEventsGeneratorJob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32448) Connector Shared Utils checks out wrong branch when running CI for PRs

2023-06-27 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32448:
--

 Summary: Connector Shared Utils checks out wrong branch when 
running CI for PRs
 Key: FLINK-32448
 URL: https://issues.apache.org/jira/browse/FLINK-32448
 Project: Flink
  Issue Type: Bug
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

2023-06-27 Thread Feng Jin
Hi Shammon
Thank you for proposing this FLIP. I think the Flink Job lineage is a very
useful feature.
I have few question:

1. For DataStream Jobs, users need to set up lineage relationships when
building DAGs for their custom sources and sinks.
However, for some common connectors such as Kafka Connector and JDBC
Connector, we can add a lineage interface like `supportReportLineage`, so
that these connectors can implement it.
This way, in the scenario of DataStream Jobs, lineages can be automatically
reported. What do you think?


2. From the current design, it seems that we need to analyze column lineage
through pipeline. As far as I know, it is relatively easy to obtain column
lineage through Calcite MetaQuery API.
Would you consider using this approach? Or do we need to implement another
parsing process based on the pipeline?
```
RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery();
metadataQuery.getColumnOrigins(inputRel, i);
```
Best,
Feng


On Sun, Jun 25, 2023 at 8:06 PM Shammon FY  wrote:

> Hi yuxia and Yun,
>
> Thanks for your input.
>
> For yuxia:
> > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
>
> At present, we only need to notify the listener when a job goes to
> termination, but I think it makes sense to add generic `oldStatus` and
> `newStatus` in the listener and users can update the job state in their
> service as needed.
>
> > 2: I'm really confused about the `config()` included in `LineageEntity`,
> where is it from and what is it for ?
>
> The `config` in `LineageEntity` is used for users to get options for source
> and sink connectors. As the examples in the FLIP, users can add
> server/group/topic information in the config for kafka and create lineage
> entities for `DataStream` jobs, then the listeners can get this information
> to identify the same connector in different jobs. Otherwise, the `config`
> in `TableLineageEntity` will be the same as `getOptions` in
> `CatalogBaseTable`.
>
> > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` is
> needed or not, since `TableSinkLineageEntity` contains
> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> changelogmode?
>
> At present, we do not actually use the changelog mode. It can be deleted,
> and I have updated FLIP.
>
> > Btw, since there're a lot interfaces proposed, I think it'll be better to
> give an example about how to implement a listener in this FLIP to make us
> know better about the interfaces.
>
> I have added the example in the FLIP and the related interfaces and
> examples are in branch [1].
>
> For Yun:
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP does not touch them, and will them become part of the
> List sources() or adding another interface?
>
> You're right, currently lookup join dim tables were not considered in the
> 'proposed changed' section of this FLIP. But the interface for lineage is
> universal and we can give `TableLookupSourceLineageEntity` which implements
> `TableSourceLineageEntity` in the future without modifying the public
> interface.
>
> > By the way, if you want to focus on job lineage instead of data column
> lineage in this FLIP, why we must introduce so many column-lineage related
> interface here?
>
> The lineage information in SQL jobs includes table lineage and column
> lineage. Although SQL jobs currently do not support column lineage, we
> would like to support this in the next step. So we have comprehensively
> considered the table lineage and column lineage interfaces here, and
> defined these two interfaces together clearly
>
>
> [1]
>
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>
> Best,
> Shammon FY
>
>
> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang  wrote:
>
> > Hi Shammon,
> >
> > I like the idea in general and it will help to analysis the job lineages
> > no matter FlinkSQL or Flink jar jobs in production environments.
> >
> > For Qingsheng's concern, I'd like the name of JobType more than
> > RuntimeExecutionMode, as the latter one is not easy to understand for
> users.
> >
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP
> > does not touch them, and will them become part of the List
> > sources()​ or adding another interface?
> >
> > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> >
> > Best
> > Yun Tang
> > 
> > From: Shammon FY 
> > Sent: Sunday, June 25, 2023 16:13
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
> >
> > Hi Qingsheng,
> >
> > Thanks for your valuable feedback.
> >
> > > 1. Is there any specific use case to expose the batch / streaming info
> to
> > listeners or meta services?
> >
> > I agree with you that Flink is evolving towards batch-streaming
> > 

Re: Async I/O: preserve stream order for requests on key level

2023-06-27 Thread Teoh, Hong
Hi Juho,

Thank you for bringing this up! Definitely +1 to this. We have had similar 
requests for the AsyncSink as well.
As a side note, it would be useful to share the same implementation for both 
somehow, to prevent duplicate code.

Happy to help with the implementation here.

For the AsyncSink, this came up when we were implementing a sink to DynamoDB. 
We use a batchWrite, and this doesn’t allow two requests on the same key in the 
same batch write. So in our case, since DDB overwrites, we take the latest 
record received from the Flink job graph as truth. [1]

[1] 
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L169-L171


Regards,
Hong




> On 26 Jun 2023, at 03:52, Jark Wu  wrote:
> 
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
> 
> 
> This is not supported by the current Async I/O API.
> But I do think this is a very useful feature and we should support it.
> Just as Jingsong said this allows changelog stream can also use Async
> Lookup Join.
> The rough idea is just like a mixture of the ordered and unordered modes of
> async operator.
> Requests for the same key should be handled in the ordered mode,
> but requests for different keys can be handled in the unordered mode.
> 
> Best,
> Jark
> 
> On Wed, 21 Jun 2023 at 12:18, Jingsong Li  wrote:
> 
>> +1 for this.
>> 
>> Actually, this is a headache for Flink SQL too.
>> 
>> There is certainly a lot of updated data (CDC changelog) in real
>> stream processing, The semantics here is the need to ensure the order
>> between keys, and different keys can be handled in disorder.
>> 
>> I'm very happy that the community has a similar need, and I think it's
>> worth refining it in Flink.
>> 
>> Best,
>> Jingsong
>> 
>> On Tue, Jun 20, 2023 at 10:20 PM Juho Autio
>>  wrote:
>>> 
>>> Thank you very much! It seems like you have a quite similar goal.
>> However,
>>> could you clarify: do you maintain the stream order on key level, or do
>> you
>>> just limit the parallel requests per key to one without caring about the
>>> order?
>>> 
>>> I'm not 100% sure how your implementation with futures is done. If you
>> are
>>> able to share a code snippet that would be much appreciated!
>>> 
>>> I'm also wondering what kind of memory implication that implementation
>> has:
>>> would the futures be queued inside the operator without any limit? Would
>> it
>>> be a problem if the same key has too many records within the same time
>>> window? But I suppose the function can be made blocking to protect
>> against
>>> that.
>>> 
>>> On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
>>>  wrote:
>>> 
 Hi Juho -- I'm doing something similar. In my case, I want to execute
>> async
 requests concurrently for inputs associated with different keys but
>> issue
 them sequentially for any given key. The way I do it is to create a
>> keyed
 stream and use it as an input to an async function. In this
>> arrangement,
 all the inputs for a given key are handled by a single instance of the
 async function; inside that function instance, I use a map to keep
>> track of
 any in-flight requests for a given key. When a new input comes in for a
 key, if there is an existing in-flight request for that key, the future
 that is constructed for the new request is constructed as [existing
 request].then([new request]) so that the new one is only executed once
>> the
 in-flight request completes. The futures are constructed in such a way
>> that
 they maintain the map properly after completing.
 
 
 On Mon, Jun 19, 2023 at 10:55 AM Juho Autio
>> 
 wrote:
 
> I need to make some slower external requests in parallel, so Async
>> I/O
> helps greatly with that. However, I also need to make the requests
>> in a
> certain order per key. Is that possible with Async I/O?
> 
> The documentation[1] talks about preserving the stream order of
> results, but it doesn't discuss the order of the async requests. I
>> tried
 to
> use AsyncDataStream.orderedWait, but the order of async requests
>> seems to
> be random – the order of calls gets shuffled even if I
> use AsyncDataStream.orderedWait.
> 
> If that is by design, would there be any suggestion how to work
>> around
> that? I was thinking of collecting all events of the same key into a
> List, so that the async operator gets a list instead of individual
 events.
> There are of course some downsides with using a List, so I would
>> rather
> have something better.
> 
> In a nutshell my code is:
> 
> AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> 
> The asyncFunction extends RichAsyncFunction.
> 
> Thanks!
> 
> [1]
> 
> 

[jira] [Created] (FLINK-32447) table hints lost when they inside a view referenced by an external query

2023-06-27 Thread lincoln lee (Jira)
lincoln lee created FLINK-32447:
---

 Summary: table hints lost when they inside a view referenced by an 
external query
 Key: FLINK-32447
 URL: https://issues.apache.org/jira/browse/FLINK-32447
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: lincoln lee
Assignee: lincoln lee
 Fix For: 1.18.0, 1.17.2


Table hints will lost when they inside a view referenced by an external query, 
this is due to the upgrading of calcite-1.28 (affected by CALCITE-4640 which 
changed the default implementation of SqlDialect suppresses all table hints).
This can be reproduced by adding a new case to current 
{code}OptionsHintTest{code}:
{code}
+
+  @Test
+  def testOptionsHintInsideView(): Unit = {
+util.tableEnv.executeSql(
+  "create view v1 as select * from t1 /*+ OPTIONS(k1='#v111', 
k4='#v444')*/")
+util.verifyExecPlan(s"""
+   |select * from t2 join v1 on v1.a = t2.d
+   |""".stripMargin)
+  }
{code}
wrong plan which lost table hints(dynamic options):
{code}
Join(joinType=[InnerJoin], where=[(a = d)], select=[d, e, f, a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[d]])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, 
source: [OptionsTableSource(props={k3=v3, k4=v4})]]], fields=[d, e, f])
+- Exchange(distribution=[hash[a]])
   +- Calc(select=[a, b, (a + 1) AS c])
  +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, 
source: [OptionsTableSource(props={k1=v1, k2=v2})]]], fields=[a, b])
{code}

We should use {code}AnsiSqlDialect{code} instead to reserve table hints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32446) MongoWriter should regularly check whether the last write time is greater than the specified time.

2023-06-27 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-32446:
--

 Summary: MongoWriter should regularly check whether the last write 
time is greater than the specified time.
 Key: FLINK-32446
 URL: https://issues.apache.org/jira/browse/FLINK-32446
 Project: Flink
  Issue Type: Bug
  Components: Connectors / MongoDB
Affects Versions: mongodb-1.0.1, mongodb-1.0.0
Reporter: Jiabao Sun


Mongo sink waits for new record to write previous records. I have a 
upsert-kafka topic filled that has already some events. I start a new 
upsert-kafka to mongo db sink job. I expect all the data from the topic to be 
loaded to mongodb right away. But instead, only the first record is written to 
mongo db. The rest of the records don’t arrive in mongodb until a new event is 
written to kafka topic. The new event that was written is delayed until the 
next event arrives. 

To prevent this problem, the MongoWriter should regularly check whether the 
last write time is greater than the specified time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-jdbc v3.1.1, release candidate #2

2023-06-27 Thread Yuepeng Pan
+1 (non-binding)

- verified signatures

- compiled from sources
- ran tests locally
- checked release notes
Best,
Yuepeng Pan

At 2023-06-27 07:42:00, "Sergey Nuyanzin"  wrote:
>+1 (non-binding)
>
>- verified hashes
>- verified signatures
>- built from sources
>- checked release notes
>- review web pr
>
>Non-blocking finding:
>  it seems the release date in web PR should be changed
>
>On Mon, Jun 26, 2023 at 1:34 PM Leonard Xu  wrote:
>
>> +1 (binding)
>>
>> - built from source code succeeded
>> - verified signatures
>> - verified hashsums
>> - checked release notes
>> - checked the contents contains jar and pom files in apache repo
>> - reviewed the web PR
>>
>> Best,
>> Leonard
>>
>> > On Jun 19, 2023, at 6:57 PM, Danny Cranmer 
>> wrote:
>> >
>> > Thanks for driving this Martijn.
>> >
>> > +1 (binding)
>> >
>> > - Reviewed web PR
>> > - Jira release notes look good
>> > - Tag exists in Github
>> > - Source archive signature/checksum looks good
>> > - Binary (from Maven) signature/checksum looks good
>> > - No binaries in the source archive
>> > - Source archive builds from source and tests pass
>> > - CI passes [1]
>> >
>> > Non blocking findings:
>> > - NOTICE files year is 2022 and needs to be updated to 2023
>> > - pom.xml is referencing Flink 1.17.0 and can be updated to 1.17.1
>> > - Some unit tests (notably OracleExactlyOnceSinkE2eTest) appear to be
>> > integration/e2e and are run in the unit test suite
>> >
>> > Thanks,
>> > Danny
>> >
>> > [1]
>> https://github.com/apache/flink-connector-jdbc/actions/runs/5278297177
>> >
>> > On Thu, Jun 15, 2023 at 12:40 PM Martijn Visser <
>> martijnvis...@apache.org>
>> > wrote:
>> >
>> >> Hi everyone,
>> >> Please review and vote on the release candidate #2 for the version
>> 3.1.1,
>> >> as follows:
>> >> [ ] +1, Approve the release
>> >> [ ] -1, Do not approve the release (please provide specific comments)
>> >>
>> >>
>> >> The complete staging area is available for your review, which includes:
>> >> * JIRA release notes [1],
>> >> * the official Apache source release to be deployed to dist.apache.org
>> >> [2],
>> >> which are signed with the key with fingerprint
>> >> A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
>> >> * all artifacts to be deployed to the Maven Central Repository [4],
>> >> * source code tag v3.1.1-rc2 [5],
>> >> * website pull request listing the new release [6].
>> >>
>> >> The vote will be open for at least 72 hours. It is adopted by majority
>> >> approval, with at least 3 PMC affirmative votes.
>> >>
>> >> Thanks,
>> >> Release Manager
>> >>
>> >> [1]
>> >>
>> >>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353281
>> >> [2]
>> >>
>> https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.1-rc2
>> >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>> >> [4]
>> https://repository.apache.org/content/repositories/orgapacheflink-1642
>> >> [5] https://github.com/apache/flink-connector-
>> >> /releases/tag/v3.1.1-rc2
>> >> [6] https://github.com/apache/flink-web/pull/654
>> >>
>>
>>
>
>-- 
>Best regards,
>Sergey


Re: [DISCUSS] Persistent SQL Gateway

2023-06-27 Thread Shammon FY
Hi Ferenc,

If I understand correctly, there will be two types of jobs in sql-gateway:
`SELECT` and `NON-SELECT` such as `INSERT`.

1. `SELECT` jobs need to collect results from Flink cluster in a
corresponding session of sql gateway, and when the session is closed, the
job should be canceled. These jobs are generally short queries similar to
OLAP and I think it may be acceptable.

2. `NON-SELECT` jobs may be batch or streaming jobs, and when the jobs are
submitted successfully, they won't be killed or canceled even if the
session or sql-gateway is closed. After these assignments are successfully
submitted, the lifecycle is no longer managed by SQL gateway.

I don't know if it covers your usage scenario. Could you describe yours for
us to test and confirm?

Best,
Shammon FY


On Tue, Jun 27, 2023 at 6:43 PM Ferenc Csaky 
wrote:

> Hi Jark,
>
> In the current implementation, any job submitted via the SQL Gateway has
> to be done through a session, cause all the operations are grouped under
> sessions.
>
> Starting from there, if I close a session, that will close the
> "SessionContext", which closes the "OperationManager" [1], and the
> "OperationManager" closes all submitted operations tied to that session
> [2], which results closing all the jobs executed in the session.
>
> Maybe I am missing something, but my experience is that the jobs I submit
> via the SQL Gateway are getting cleaned up on gateway session close.
>
> WDYT?
>
> Cheers,
> F
>
> [1]
> https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java#L204
> [2]
> https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java#L194
>
>
>
> --- Original Message ---
> On Tuesday, June 27th, 2023 at 04:37, Jark Wu  wrote:
>
>
> >
> >
> > Hi Ferenc,
> >
> > But the job lifecycle doesn't tie to the SQL Gateway session.
> > Even if the session is closed, all the running jobs are not affected.
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Tue, 27 Jun 2023 at 04:14, Ferenc Csaky ferenc.cs...@pm.me.invalid
> >
> > wrote:
> >
> > > Hi Jark,
> > >
> > > Thank you for pointing out FLIP-295 abouth catalog persistence, I was
> not
> > > aware the current state. Although as far as I see, that persistent
> catalogs
> > > are necessary, but not sufficient achieving a "persistent gateway".
> > >
> > > The current implementation ties the job lifecycle to the SQL gateway
> > > session, so if it gets closed, it will cancel all the jobs. So that
> would
> > > be the next step I think. Any work or thought regarding this aspect?
> We are
> > > definitely willing to help out on this front.
> > >
> > > Cheers,
> > > F
> > >
> > > --- Original Message ---
> > > On Sunday, June 25th, 2023 at 06:23, Jark Wu imj...@gmail.com wrote:
> > >
> > > > Hi Ferenc,
> > > >
> > > > Making SQL Gateway to be an easy-to-use platform infrastructure of
> Flink
> > > > SQL
> > > > is one of the important roadmaps 1.
> > > >
> > > > The persistence ability of the SQL Gateway is a major work in 1.18
> > > > release.
> > > > One of the persistence demand is that the registered catalogs are
> > > > currently
> > > > kept in memory and lost when Gateway restarts. There is an accepted
> FLIP
> > > > (FLIP-295)[2] target to resolve this issue and make Gateway can
> persist
> > > > the
> > > > registered catalogs information into files or databases.
> > > >
> > > > I'm not sure whether this is something you are looking for?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > [2]:
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > > > On Fri, 23 Jun 2023 at 00:25, Ferenc Csaky ferenc.cs...@pm.me.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hello devs,
> > > > >
> > > > > I would like to open a discussion about persistence possibilitis
> for
> > > > > the
> > > > > SQL Gateway. At Cloudera, we are happy to see the work already
> done on
> > > > > this
> > > > > project and looking for ways to utilize it on our platform as
> well, but
> > > > > currently it lacks some features that would be essential in our
> case,
> > > > > where
> > > > > we could help out.
> > > > >
> > > > > I am not sure if any thought went into gateway persistence
> specifics
> > > > > already, and this feature could be implemented in fundamentally
> > > > > differnt
> > > > > ways, so I think the frist step could be to agree on the basics.
> > > > >
> > > > > First, in my opinion, persistence should be an optional feature of
> the
> > > > > gateway, that can be enabled if desired. There can be a lot of
> > > > > implementation details, but there can be some major directions to
> > > > > follow:
> > > > >
> > > > > - Utilize Hive catalog: 

Re: [DISCUSS] Release 2.0 Work Items

2023-06-27 Thread Xintong Song
Hi Alex & Gyula,

By compatibility discussion do you mean the "[DISCUSS] FLIP-321: Introduce
> an API deprecation process" thread [1]?
>

Yes, I meant the FLIP-321 discussion. I just noticed I pasted the wrong url
in my previous email. Sorry for the mistake.

I am also curious to know if the rationale behind this new API has been
> previously discussed on the mailing list. Do we have a list of shortcomings
> in the current DataStream API that it tries to resolve? How does the
> current ProcessFunction functionality fit into the picture? Will it be kept
> as is or subsumed by new API?
>

I don't think we should create a replacement for the DataStream API unless
> we have a very good reason to do so and with a proper discussion about this
> as Alex said.


The ProcessFunction API which is targeting to replace DataStream API is
still a proposal, not a decision. Sorry for the confusion, I should have
been more careful with my words, not giving the impression that this is
something we'll do anyway.

There will be a FLIP describing the motivations and designs in detail, for
the community to discuss and vote on. We are still working on it. TBH, this
is not trivial and we would need more time on it.

Just to quickly share some backgrounds:

   - We see quite some problems with the current DataStream APIs
  - Users are working with concrete classes rather than interfaces,
  which means
  - Users can access methods that are designed to be used by internal
 classes, even though they are annotated with `@Internal`. E.g.,
 `DataStream#getTransformation`.
 - Changes to the non-API implementations (e.g., `Transformation`)
 would affect the API classes (e.g., `DataStream`), which
makes it hard to
 provide binary compatibility.
  - Internal classes are used as parameter / return-value of public
  APIs. E.g., while `AbstractStreamOperator` is PublicEvolving,
`StreamTask`
  which returns from `AbstractStreamOperator#getContainingTask` is Internal.
  - In many cases, users are asked to extend the API classes, rather
  than implementing interfaces. E.g., `AbstractStreamOperator`.
 - Any changes to the base classes, even the internal part, may
 affect the behavior of the user-provided sub-classes
 - Users can override the behavior of the base classes
  - The API module `flink-streaming-java` contains non-API classes, and
  depends on internal modules such as `flink-runtime`, which means
  - Changes to the internal modules may affect the API modules, which
 requires users to re-build their applications upon upgrading
 - The artifact user needs for building their application larger
 than necessary.
  - We probably should not expose operators (e.g.,
  `AbstractStreamOperator`) to users. Functions should be enough
for users to
  define their data processing logics. Exposing operator-level concepts
  (e.g., mailbox thread model, checkpoint barrier alignment, etc.) is
  unnecessary and limits the improvement regarding such exposed mechanisms
  with compatibility considerations.
  - The current DataStream API seems to be a mixture of many things,
  making it hard to understand especially for newcomers. It might be better
  to re-organize it into several parts: (the taxonomy below are just an
  example of the, we are still working on this)
 - The most fundamental stateful stream processing: streams,
 partitions / key, process functions, state, timeline-service
 - An extension for common batch-streaming unified functions: map,
 flatmap, filter, agg, reduce, join, etc.
 - An extension for windowing supports:  window, triggering
 - An extension for event-time supports: event time, watermark
 - The extensions are like short-cuts / sugars, without which users
 can probably still achieve the same behavior by working with the
 fundamental APIs, but would be a lot easier with the extensions
  - The original plan was to do in-place refactors / changes on
   DataStream API. Some related items are listed in this doc [2] attached to
   the kicking off email [3]. Not all of the above issues are listed, because
   we haven't looked into this as deeply as now  by that time.
   - We proposed this as a new API rather than in-place refactors in the
   2.0 work item list, because we realized the changes might be too big for an
   in-place change. First having a new API then gradually retiring the old one
   would help users to smoothly migrate between them.

A thorough discussion is definitely needed once the FLIP is out. And of
course it's possible that the FLIP might be rejected. Given that we are
planning for release 2.0, I just feel it would be better to bring this up
early even the concrete plan is not yet ready,

Best,

Xintong


[1] https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9
[2]

Re: Flink Mongodb Sink - Issues

2023-06-27 Thread Jiabao Sun
Hi Harish,

Thanks to report this issue. There are currently 5 ways to write:

1. Flush only on checkpoint
'sink.buffer-flush.interval' = '-1' and 'sink.buffer-flush.max-rows' = '-1'

2. Flush for for every single element
'sink.buffer-flush.interval' = '0' or 'sink.buffer-flush.max-rows' = '1'

3. Flush when time interval exceed limit
'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' = '-1'

4. Flush when batch size exceed limit
'sink.buffer-flush.max-rows' > '1' and 'sink.buffer-flush.interval' = '-1'

5. Flush when time interval or batch size exceed limit. 
'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' > 1

The default write mode is 5 with 'sink.buffer-flush.interval' = '1s' and 
'sink.buffer-flush.max-rows' = '1000'.
Whether the data sinks to mongodb is judged at the moment of checkpoint or the 
next record comes in.
Currently, there is no separate thread to regularly check whether the interval 
between writing the last record exceeds the limit. 
Therefore, when you do not enable checkpoint for writing, delayed writing will 
occur.

You can try to enable checkpoint or use single data writing mode (mode 2) to 
avoid this problem.


Best,
Jiabao



> 2023年6月27日 下午6:00, 
>  写道:
> 
> Hi,
> 
> I am using the flink version 1.7.1 and flink-mongodb-sql-connector version 
> 1.0.1-1.17.
> 
> Below is the data pipeline flow.
> 
> Source 1 (Kafka topic using Kafka connector) -> Window Aggregation  (legacy 
> grouped window aggregation) -> Sink (Kafka topic using upsert-kafka 
> connector) -> Sink(Mongdb collection).
> 
> 
> I noticed a couple of issues with the mongodb-sink connector.
> 
> Issue 1: Mongo sink delays event write as soon as a tombstone message is 
> received.
> 
> When a new key comes in, the aggregation is made and the result is written 
> successfully to kafka topic and also to mongo db immediately.
> 
> Another new key comes in, the aggregation is made, the result is available in 
> mongodb.
> 
> But when a key repeats for the first time, the aggregation is made, the 
> results are written to kafka topic. You get 1 delete record and 1 latest 
> record for the key in the topic.  The data for the key is deleted in the 
> mongodb but, the latest record for the key is not inserted to mongodb.
> 
> When a new key or another key comes in, the previous record latest key is 
> inserted to mongo db.
> 
> The same pattern exists for subsequent records.
> 
> There is always a delay of one event as soon as a tombstone record is found 
> by the connector.
> 
> Issue 2: Mongo sink waits for new record to write previous records.
> 
> I have a upsert-kafka topic filled that has already some events.
> I start a new upsert-kafka to mongo db sink job.
> 
> I expect all the data from the topic to be loaded to mongodb right away.
> But instead, only the first record is written to mongo db.
> The rest of the records don’t arrive in mongodb  until a  new event is 
> written to kafka topic.
> The new event that was written is delayed until the next event arrives.
> 
> I am not able to understand this behaviour.
> This doesn’t feel like an expected behaviour.
> Can someone please advice if I am missing something or an issue exists.
> 
> Regards,
> Harish.
> 
> 
> 
> 
> 



Re: [DISCUSS] Persistent SQL Gateway

2023-06-27 Thread Ferenc Csaky
Hi Jark,

In the current implementation, any job submitted via the SQL Gateway has to be 
done through a session, cause all the operations are grouped under sessions.

Starting from there, if I close a session, that will close the 
"SessionContext", which closes the "OperationManager" [1], and the 
"OperationManager" closes all submitted operations tied to that session [2], 
which results closing all the jobs executed in the session.

Maybe I am missing something, but my experience is that the jobs I submit via 
the SQL Gateway are getting cleaned up on gateway session close.

WDYT?

Cheers,
F

[1] 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java#L204
[2] 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java#L194



--- Original Message ---
On Tuesday, June 27th, 2023 at 04:37, Jark Wu  wrote:


> 
> 
> Hi Ferenc,
> 
> But the job lifecycle doesn't tie to the SQL Gateway session.
> Even if the session is closed, all the running jobs are not affected.
> 
> Best,
> Jark
> 
> 
> 
> 
> On Tue, 27 Jun 2023 at 04:14, Ferenc Csaky ferenc.cs...@pm.me.invalid
> 
> wrote:
> 
> > Hi Jark,
> > 
> > Thank you for pointing out FLIP-295 abouth catalog persistence, I was not
> > aware the current state. Although as far as I see, that persistent catalogs
> > are necessary, but not sufficient achieving a "persistent gateway".
> > 
> > The current implementation ties the job lifecycle to the SQL gateway
> > session, so if it gets closed, it will cancel all the jobs. So that would
> > be the next step I think. Any work or thought regarding this aspect? We are
> > definitely willing to help out on this front.
> > 
> > Cheers,
> > F
> > 
> > --- Original Message ---
> > On Sunday, June 25th, 2023 at 06:23, Jark Wu imj...@gmail.com wrote:
> > 
> > > Hi Ferenc,
> > > 
> > > Making SQL Gateway to be an easy-to-use platform infrastructure of Flink
> > > SQL
> > > is one of the important roadmaps 1.
> > > 
> > > The persistence ability of the SQL Gateway is a major work in 1.18
> > > release.
> > > One of the persistence demand is that the registered catalogs are
> > > currently
> > > kept in memory and lost when Gateway restarts. There is an accepted FLIP
> > > (FLIP-295)[2] target to resolve this issue and make Gateway can persist
> > > the
> > > registered catalogs information into files or databases.
> > > 
> > > I'm not sure whether this is something you are looking for?
> > > 
> > > Best,
> > > Jark
> > > 
> > > [2]:
> > 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > 
> > > On Fri, 23 Jun 2023 at 00:25, Ferenc Csaky ferenc.cs...@pm.me.invalid
> > > 
> > > wrote:
> > > 
> > > > Hello devs,
> > > > 
> > > > I would like to open a discussion about persistence possibilitis for
> > > > the
> > > > SQL Gateway. At Cloudera, we are happy to see the work already done on
> > > > this
> > > > project and looking for ways to utilize it on our platform as well, but
> > > > currently it lacks some features that would be essential in our case,
> > > > where
> > > > we could help out.
> > > > 
> > > > I am not sure if any thought went into gateway persistence specifics
> > > > already, and this feature could be implemented in fundamentally
> > > > differnt
> > > > ways, so I think the frist step could be to agree on the basics.
> > > > 
> > > > First, in my opinion, persistence should be an optional feature of the
> > > > gateway, that can be enabled if desired. There can be a lot of
> > > > implementation details, but there can be some major directions to
> > > > follow:
> > > > 
> > > > - Utilize Hive catalog: The Hive catalog can already be used to have
> > > > persistenct meta-objects, so the crucial thing that would be missing in
> > > > this case is other catalogs. Personally, I would not pursue this
> > > > option,
> > > > because in my opinion it would limit the usability of this feature too
> > > > much.
> > > > - Serialize the session as is: Saving the whole session (or its
> > > > context)
> > > > 1 as is to durable storage, so it can be kept and picked up again.
> > > > - Serialize the required elements (catalogs, tables, functions, etc.),
> > > > not
> > > > necessarily as a whole: The main point here would be to serialize a
> > > > different object, so the persistent data will not be that sensitive to
> > > > changes of the session (or its context). There can be numerous factors
> > > > here, like try to keep the model close to the session itself, so the
> > > > boilerplate required for the mapping can be kept to minimal, or focus
> > > > on
> > > > saving what is actually necessary, making the persistent storage more
> > > > 

Flink Mongodb Sink - Issues

2023-06-27 Thread harish.sridhar
Hi,

I am using the flink version 1.7.1 and flink-mongodb-sql-connector version 
1.0.1-1.17.

Below is the data pipeline flow.

Source 1 (Kafka topic using Kafka connector) -> Window Aggregation  (legacy 
grouped window aggregation) -> Sink (Kafka topic using upsert-kafka connector) 
-> Sink(Mongdb collection).


I noticed a couple of issues with the mongodb-sink connector.

Issue 1: Mongo sink delays event write as soon as a tombstone message is 
received.

When a new key comes in, the aggregation is made and the result is written 
successfully to kafka topic and also to mongo db immediately.

Another new key comes in, the aggregation is made, the result is available in 
mongodb.

But when a key repeats for the first time, the aggregation is made, the results 
are written to kafka topic. You get 1 delete record and 1 latest record for the 
key in the topic.  The data for the key is deleted in the mongodb but, the 
latest record for the key is not inserted to mongodb.

When a new key or another key comes in, the previous record latest key is 
inserted to mongo db.

The same pattern exists for subsequent records.

There is always a delay of one event as soon as a tombstone record is found by 
the connector.

Issue 2: Mongo sink waits for new record to write previous records.

I have a upsert-kafka topic filled that has already some events.
I start a new upsert-kafka to mongo db sink job.

I expect all the data from the topic to be loaded to mongodb right away.
But instead, only the first record is written to mongo db.
The rest of the records don’t arrive in mongodb  until a  new event is written 
to kafka topic.
The new event that was written is delayed until the next event arrives.

I am not able to understand this behaviour.
This doesn’t feel like an expected behaviour.
Can someone please advice if I am missing something or an issue exists.

Regards,
Harish.







Re: [DISCUSS] Release 2.0 Work Items

2023-06-27 Thread Gyula Fóra
Hey!

I share the same concerns mentioned above regarding the "ProcessFunction
API".

I don't think we should create a replacement for the DataStream API unless
we have a very good reason to do so and with a proper discussion about this
as Alex said.

Cheers,
Gyula

On Tue, Jun 27, 2023 at 11:03 AM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> Hi Xintong,
>
> By compatibility discussion do you mean the "[DISCUSS] FLIP-321: Introduce
> an API deprecation process" thread [1]?
>
> I am also curious to know if the rationale behind this new API has been
> previously discussed on the mailing list. Do we have a list of shortcomings
> in the current DataStream API that it tries to resolve? How does the
> current ProcessFunction functionality fit into the picture? Will it be kept
> as is or subsumed by new API?
>
> [1] https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9
>
> Best,
> Alex
>
> On Mon, 26 Jun 2023 at 14:33, Xintong Song  wrote:
>
> > >
> > > The ProcessFunction API item is giving me the most headaches because
> it's
> > > very unclear what it actually entails; like is it an entirely separate
> > API
> > > to DataStream (sounds like it is!) or an extension of DataStream. How
> > much
> > > will it share the internals with DataStream etc.; how does it relate to
> > the
> > > Table API (w.r.t. switching APIs / what Table API uses underneath).
> > >
> >
> > I totally understand your confusion. We started planning this after
> kicking
> > off the release 2.0, so there's still a lot to be explored and the plan
> > keeps changing.
> >
> >
> >- In the beginning, we planned to do an in-place refactor of
> DataStream
> >API, until the API migration period is proposed.
> >- Then we want to make it an entirely separate API to DataStream, and
> >listed as a must-have for release 2.0 so that we can remove DataStream
> > once
> >it's ready.
> >- However, depending on the outcome of the API compatibility
> discussion
> >[1], we may not be able to remove DataStream in 2.0 anyway, which
> means
> > we
> >might need to re-evaluate the necessity of this item for 2.0.
> >
> > I'd say we wait a bit longer for the compatibility discussion [1] and
> > decide the priority for this item afterwards.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://lists.apache.org/list.html?dev@flink.apache.org
> >
> >
> > On Mon, Jun 26, 2023 at 6:00 PM Chesnay Schepler 
> > wrote:
> >
> > > by-and-large I'm quite happy with the list of items.
> > >
> > > I'm curious as to why the "Disaggregated State Management" item is
> marked
> > > as a must-have; will it require changes that break something? What
> > prevents
> > > it from being added in 2.1?
> > >
> > > We may want to update the Java 17 item to "Make Java 17 the default,
> drop
> > > Java 8/11". Maybe even split it into a must-have "Drop Java 8" and a
> > > nice-to-have "Drop Java 11"?
> > >
> > > "Move Calcite rules from Scala to Java": I would hope that this would
> be
> > > an entirely internal change, and could thus be an incremental process
> > > independent of major releases.
> > > What is the actual scale of this item; how much are we actually
> > re-writing?
> > >
> > > "Add MetricGroup#getLogicalScope": I'd raise this to a must-have; i
> think
> > > I marked it down as nice-to-have only because it depends on another
> item.
> > >
> > > The ProcessFunction API item is giving me the most headaches because
> it's
> > > very unclear what it actually entails; like is it an entirely separate
> > API
> > > to DataStream (sounds like it is!) or an extension of DataStream. How
> > much
> > > will it share the internals with DataStream etc.; how does it relate to
> > the
> > > Table API (w.r.t. switching APIs / what Table API uses underneath).
> > >
> > > There are a few items I added as ideas which don't have a priority yet;
> > > would love to get some feedback on those.
> > >
> > > On 21/06/2023 08:41, Xintong Song wrote:
> > >
> > > Hi devs,
> > >
> > > As previously discussed in [1], we had been collecting work item
> > proposals
> > > for the 2.0 release until June 15th, on the wiki page [2].
> > >
> > >- As we have passed the due date, I'd like to kindly remind everyone
> > *not
> > >to add / remove items directly on the wiki page*. If needed, please
> > post
> > >in this thread or reach out to the release managers instead.
> > >- I've reached out to some folks for clarifications about their
> > >proposals. Some of them mentioned that they can not yet tell whether
> > we
> > >should do an item or not, and would need more time / discussions to
> > make
> > >the decision. So I added a new symbol for items whose priorities are
> > `TBD`.
> > >
> > > Now it's time to collaboratively decide a minimum set of must-have
> items.
> > > I've gone through the entire list of proposed items, and found most of
> > them
> > > make quite much sense. So I think an online sync might not be necessary
> > 

Re: [DISCUSS] Release 2.0 Work Items

2023-06-27 Thread Alexander Fedulov
Hi Xintong,

By compatibility discussion do you mean the "[DISCUSS] FLIP-321: Introduce
an API deprecation process" thread [1]?

I am also curious to know if the rationale behind this new API has been
previously discussed on the mailing list. Do we have a list of shortcomings
in the current DataStream API that it tries to resolve? How does the
current ProcessFunction functionality fit into the picture? Will it be kept
as is or subsumed by new API?

[1] https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9

Best,
Alex

On Mon, 26 Jun 2023 at 14:33, Xintong Song  wrote:

> >
> > The ProcessFunction API item is giving me the most headaches because it's
> > very unclear what it actually entails; like is it an entirely separate
> API
> > to DataStream (sounds like it is!) or an extension of DataStream. How
> much
> > will it share the internals with DataStream etc.; how does it relate to
> the
> > Table API (w.r.t. switching APIs / what Table API uses underneath).
> >
>
> I totally understand your confusion. We started planning this after kicking
> off the release 2.0, so there's still a lot to be explored and the plan
> keeps changing.
>
>
>- In the beginning, we planned to do an in-place refactor of DataStream
>API, until the API migration period is proposed.
>- Then we want to make it an entirely separate API to DataStream, and
>listed as a must-have for release 2.0 so that we can remove DataStream
> once
>it's ready.
>- However, depending on the outcome of the API compatibility discussion
>[1], we may not be able to remove DataStream in 2.0 anyway, which means
> we
>might need to re-evaluate the necessity of this item for 2.0.
>
> I'd say we wait a bit longer for the compatibility discussion [1] and
> decide the priority for this item afterwards.
>
>
> Best,
>
> Xintong
>
>
> [1] https://lists.apache.org/list.html?dev@flink.apache.org
>
>
> On Mon, Jun 26, 2023 at 6:00 PM Chesnay Schepler 
> wrote:
>
> > by-and-large I'm quite happy with the list of items.
> >
> > I'm curious as to why the "Disaggregated State Management" item is marked
> > as a must-have; will it require changes that break something? What
> prevents
> > it from being added in 2.1?
> >
> > We may want to update the Java 17 item to "Make Java 17 the default, drop
> > Java 8/11". Maybe even split it into a must-have "Drop Java 8" and a
> > nice-to-have "Drop Java 11"?
> >
> > "Move Calcite rules from Scala to Java": I would hope that this would be
> > an entirely internal change, and could thus be an incremental process
> > independent of major releases.
> > What is the actual scale of this item; how much are we actually
> re-writing?
> >
> > "Add MetricGroup#getLogicalScope": I'd raise this to a must-have; i think
> > I marked it down as nice-to-have only because it depends on another item.
> >
> > The ProcessFunction API item is giving me the most headaches because it's
> > very unclear what it actually entails; like is it an entirely separate
> API
> > to DataStream (sounds like it is!) or an extension of DataStream. How
> much
> > will it share the internals with DataStream etc.; how does it relate to
> the
> > Table API (w.r.t. switching APIs / what Table API uses underneath).
> >
> > There are a few items I added as ideas which don't have a priority yet;
> > would love to get some feedback on those.
> >
> > On 21/06/2023 08:41, Xintong Song wrote:
> >
> > Hi devs,
> >
> > As previously discussed in [1], we had been collecting work item
> proposals
> > for the 2.0 release until June 15th, on the wiki page [2].
> >
> >- As we have passed the due date, I'd like to kindly remind everyone
> *not
> >to add / remove items directly on the wiki page*. If needed, please
> post
> >in this thread or reach out to the release managers instead.
> >- I've reached out to some folks for clarifications about their
> >proposals. Some of them mentioned that they can not yet tell whether
> we
> >should do an item or not, and would need more time / discussions to
> make
> >the decision. So I added a new symbol for items whose priorities are
> `TBD`.
> >
> > Now it's time to collaboratively decide a minimum set of must-have items.
> > I've gone through the entire list of proposed items, and found most of
> them
> > make quite much sense. So I think an online sync might not be necessary
> for
> > this. I'd like to go with this DISCUSS thread, where everyone can comment
> > on how they think the list can be improved, followed by a VOTE to
> formally
> > make the decision.
> >
> > Any feedback and opinions, including but not limited to the following
> > aspects, will be appreciated.
> >
> >- Important items that are missing from the list
> >- Concerns regarding the listed items or their priorities
> >
> > Looking forward to your feedback.
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1]
> https://lists.apache.org/list?dev@flink.apache.org:lte=1M:release%202.0%20status%20updates
> >
> > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thanks Jack, Jingsong, and Zhu for the review!

Thanks Zhu for the suggestion. I have updated the configuration name as
suggested.

On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:

> Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.
>
> The new design looks generally good to me. Increasing the checkpoint
> interval when the job is processing backlogs is easier for users to
> understand and can help in more scenarios.
>
> I have one comment about the new configuration.
> Naming the new configuration
> "execution.checkpointing.interval-during-backlog" would be better
> according to Flink config naming convention.
> It is also because that nested config keys should be avoided. See
> FLINK-29372 for more details.
>
> Thanks,
> Zhu
>
> Jingsong Li  于2023年6月27日周二 15:45写道:
> >
> > Looks good to me!
> >
> > Thanks Dong, Yunfeng and all for your discussion and design.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > >
> > > Thank you Dong for driving this FLIP.
> > >
> > > The new design looks good to me!
> > >
> > > Best,
> > > Jark
> > >
> > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > >
> > > > Thank you Leonard for the review!
> > > >
> > > > Hi Piotr, do you have any comments on the latest proposal?
> > > >
> > > > I am wondering if it is OK to start the voting thread this week.
> > > >
> > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> wrote:
> > > >
> > > >> Thanks Dong for driving this FLIP forward!
> > > >>
> > > >> Introducing  `backlog status` concept for flink job makes sense to
> me as
> > > >> following reasons:
> > > >>
> > > >> From concept/API design perspective, it’s more general and natural
> than
> > > >> above proposals as it can be used in HybridSource for bounded
> records, CDC
> > > >> Source for history snapshot and general sources like KafkaSource for
> > > >> historical messages.
> > > >>
> > > >> From user cases/requirements, I’ve seen many users manually to set
> larger
> > > >> checkpoint interval during backfilling and then set a shorter
> checkpoint
> > > >> interval for real-time processing in their production environments
> as a
> > > >> flink application optimization. Now, the flink framework can make
> this
> > > >> optimization no longer require the user to set the checkpoint
> interval and
> > > >> restart the job multiple times.
> > > >>
> > > >> Following supporting using larger checkpoint for job under backlog
> status
> > > >> in current FLIP, we can explore supporting larger
> parallelism/memory/cpu
> > > >> for job under backlog status in the future.
> > > >>
> > > >> In short, the updated FLIP looks good to me.
> > > >>
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>
> > > >>
> > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin 
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > >>>
> > > >>> After discussing with Becket Qin and thinking about this more, I
> agree it
> > > >>> is a better idea to add a top-level concept to all source
> operators to
> > > >>> address the target use-case.
> > > >>>
> > > >>> The main reason that changed my mind is that isProcessingBacklog
> can be
> > > >>> described as an inherent/nature attribute of every source instance
> and
> > > >> its
> > > >>> semantics does not need to depend on any specific checkpointing
> policy.
> > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> sources we
> > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> without
> > > >>> asking users to explicitly configure the per-source behavior, which
> > > >> indeed
> > > >>> provides better user experience.
> > > >>>
> > > >>> I have updated the FLIP based on the latest suggestions. The
> latest FLIP
> > > >> no
> > > >>> longer introduces per-source config that can be used by end-users.
> While
> > > >> I
> > > >>> agree with you that CheckpointTrigger can be a useful feature to
> address
> > > >>> additional use-cases, I am not sure it is necessary for the
> use-case
> > > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger
> separately
> > > >>> in another FLIP?
> > > >>>
> > > >>> Can you help take another look at the updated FLIP?
> > > >>>
> > > >>> Best,
> > > >>> Dong
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > >>> wrote:
> > > >>>
> > >  Hi Dong,
> > > 
> > > > Suppose there are 1000 subtask and each subtask has 1% chance of
> being
> > > > "backpressured" at a given time (due to random traffic spikes).
> Then at
> > >  any
> > > > given time, the chance of the job
> > > > being considered not-backpressured = (1-0.01)^1000. Since we
> evaluate
> > > >> the
> > > > backpressure metric once a second, the estimated time for the job
> > > > to be considered not-backpressured is roughly 1 /
> ((1-0.01)^1000) =
> > > >> 23163
> > > > sec = 6.4 hours.
> > > 

Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-06-27 Thread yuxia
Hi, jing.
Thanks for pointing it out. Yes, it's a typo. I should be option. Now, I have 
updated the FLIP.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jing Ge" 
收件人: "dev" 
抄送: "zhangmang1" 
发送时间: 星期二, 2023年 6 月 27日 下午 4:26:20
主题: Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement

Hi Yuxia,

Thanks for the proposal. Many engines like Snowflake, Databricks support
it. +1

"3:Check the atomicity is enabled, it requires both the options
table.rtas-ctas.atomicity-enabled is set to true and the corresponding
table sink implementation SupportsStaging."

Typo? "Option" instead of "options"? It sounds like there are more options
that need to be set.

Best regards,
jing




On Tue, Jun 27, 2023 at 8:37 AM yuxia  wrote:

> Hi, all.
> Thanks for the feedback.
>
> If there are no other questions or concerns for the FLIP[1], I'd like to
> start the vote tomorrow (6.28).
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
>
> Best regards,
> Yuxia
>
>
> 发件人: "zhangmang1" 
> 收件人: "dev" , luoyu...@alumni.sjtu.edu.cn
> 发送时间: 星期二, 2023年 6 月 27日 下午 12:03:35
> 主题: Re:Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement
>
> Hi yuxia,
>
> +1 for this new feature.
> In particular, the CREATE OR REPLACE TABLE syntax is more usable and
> faster for users.
>
>
>
>
>
> --
> Best regards,
> Mang Zhang
>
>
>
>
> At 2023-06-26 09:46:40, "yuxia"  wrote:
> >Hi, folks.
> >To save the time of reviewers, I would like to summary the main changes
> of this FLIP[1]. The FLIP is just to introduce REPLACE TABLE AS SELECT
> statement which is almost similar to CREATE TABLE AS SELECT statement, and
> a syntax CREATE OR REPLACE TABLE AS to wrap both. This FLIP is try to
> complete such kinds of statement.
> >
> >The changes are as follows:
> >1: Add enum REPLACE_TABLE_AS, CREATE_OR_REPLACE_TABLE_AS in
> StagingPurpose which is proposed in FLIP-305[2].
> >
> >2: Change the configuration from `table.ctas.atomicity-enabled` proposed
> in FLIP-305[2] to `table.rtas-ctas.atomicity-enabled` to make it take
> effect not only for create table as, but for replace table as && create or
> replace table as. The main reason is that these statements are almost same
> which belongs to same statement family and I would not like to introduce a
> new different configuration which actually do the same thing. Also, IIRC,
> in the offline dicussion about FLIP-218[1], it also wants to introduce
> `table.rtas-ctas.atomicity-enabled`, but as FLIP-218 is only to support
> CTAS, it's not suitable to introduce a configuration implying rtas which is
> not supported. So, we change the configuration to
> `table.ctas.atomicity-enabled`. Since CTAS has been supported, I think it's
> reasonable to revist it and introduce `table.rtas-ctas.atomicity-enabled` a
> to unify them in this FLIP for supporting REPLACE TABLE AS statement.
> >
> >
> >Again, look forward to your feedback.
> >
> >[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> >[2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >[3]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185
> >
> >Best regards,
> >Yuxia
> >
> >- 原始邮件 -
> >发件人: "yuxia" 
> >收件人: "dev" 
> >发送时间: 星期四, 2023年 6 月 15日 下午 7:58:27
> >主题: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement
> >
> >Hi, devs.
> >As the FLIPs FLIP-218[1] & FLIP-305[2] for Flink to supports CREATE TABLE
> AS SELECT statement has been accepted.
> >I would like to start a discussion about FLIP-303: Support REPLACE TABLE
> AS SELECT+statement[3] to complete such kinds of statements.
> >With REPLACE TABLE AS SELECT statement, users won't need to drop the
> table firstly, and use CREATE TABLE AS SELECT then. Since the statement is
> much similar to CREATE TABLE AS statement, the design is much similar to
> FLIP-218[1] & FLIP-305[2] apart from some parts specified to REPLACE TABLE
> AS SELECT statement.
> >Just kindly remind, to understand this FLIP better, you may need read
> FLIP-218[1] & FLIP-305[2] to get more context.
> >
> >Look forward to your feedback.
> >
> >[1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185
> >[2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >[3]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> >
> >:) just notice I miss "[DISCUSS]" in the title of the previous email [4],
> so I send it again here with the correct email title. Please ignore the
> previous email and discuss in this thread.
> >Sorry for the noise.
> >
> >[4]: https://lists.apache.org/thread/jy39xwxn1o2035y5411xynwtbyfgg76t
> >
> >
> >Best regards,
> >Yuxia
>
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Zhu Zhu
Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.

The new design looks generally good to me. Increasing the checkpoint
interval when the job is processing backlogs is easier for users to
understand and can help in more scenarios.

I have one comment about the new configuration.
Naming the new configuration
"execution.checkpointing.interval-during-backlog" would be better
according to Flink config naming convention.
It is also because that nested config keys should be avoided. See
FLINK-29372 for more details.

Thanks,
Zhu

Jingsong Li  于2023年6月27日周二 15:45写道:
>
> Looks good to me!
>
> Thanks Dong, Yunfeng and all for your discussion and design.
>
> Best,
> Jingsong
>
> On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> >
> > Thank you Dong for driving this FLIP.
> >
> > The new design looks good to me!
> >
> > Best,
> > Jark
> >
> > > 2023年6月27日 14:38,Dong Lin  写道:
> > >
> > > Thank you Leonard for the review!
> > >
> > > Hi Piotr, do you have any comments on the latest proposal?
> > >
> > > I am wondering if it is OK to start the voting thread this week.
> > >
> > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> > >
> > >> Thanks Dong for driving this FLIP forward!
> > >>
> > >> Introducing  `backlog status` concept for flink job makes sense to me as
> > >> following reasons:
> > >>
> > >> From concept/API design perspective, it’s more general and natural than
> > >> above proposals as it can be used in HybridSource for bounded records, 
> > >> CDC
> > >> Source for history snapshot and general sources like KafkaSource for
> > >> historical messages.
> > >>
> > >> From user cases/requirements, I’ve seen many users manually to set larger
> > >> checkpoint interval during backfilling and then set a shorter checkpoint
> > >> interval for real-time processing in their production environments as a
> > >> flink application optimization. Now, the flink framework can make this
> > >> optimization no longer require the user to set the checkpoint interval 
> > >> and
> > >> restart the job multiple times.
> > >>
> > >> Following supporting using larger checkpoint for job under backlog status
> > >> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> > >> for job under backlog status in the future.
> > >>
> > >> In short, the updated FLIP looks good to me.
> > >>
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> > >>>
> > >>> Hi Piotr,
> > >>>
> > >>> Thanks again for proposing the isProcessingBacklog concept.
> > >>>
> > >>> After discussing with Becket Qin and thinking about this more, I agree 
> > >>> it
> > >>> is a better idea to add a top-level concept to all source operators to
> > >>> address the target use-case.
> > >>>
> > >>> The main reason that changed my mind is that isProcessingBacklog can be
> > >>> described as an inherent/nature attribute of every source instance and
> > >> its
> > >>> semantics does not need to depend on any specific checkpointing policy.
> > >>> Also, we can hardcode the isProcessingBacklog behavior for the sources 
> > >>> we
> > >>> have considered so far (e.g. HybridSource and MySQL CDC source) without
> > >>> asking users to explicitly configure the per-source behavior, which
> > >> indeed
> > >>> provides better user experience.
> > >>>
> > >>> I have updated the FLIP based on the latest suggestions. The latest FLIP
> > >> no
> > >>> longer introduces per-source config that can be used by end-users. While
> > >> I
> > >>> agree with you that CheckpointTrigger can be a useful feature to address
> > >>> additional use-cases, I am not sure it is necessary for the use-case
> > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger 
> > >>> separately
> > >>> in another FLIP?
> > >>>
> > >>> Can you help take another look at the updated FLIP?
> > >>>
> > >>> Best,
> > >>> Dong
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> > >>> wrote:
> > >>>
> >  Hi Dong,
> > 
> > > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > > "backpressured" at a given time (due to random traffic spikes). Then 
> > > at
> >  any
> > > given time, the chance of the job
> > > being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> > >> the
> > > backpressure metric once a second, the estimated time for the job
> > > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> > >> 23163
> > > sec = 6.4 hours.
> > >
> > > This means that the job will effectively always use the longer
> > > checkpointing interval. It looks like a real concern, right?
> > 
> >  Sorry I don't understand where you are getting those numbers from.
> >  Instead of trying to find loophole after loophole, could you try to
> > >> think
> >  how a given loophole could be improved/solved?
> > 
> > > Hmm... I honestly think it will be useful to know 

Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-06-27 Thread Jing Ge
Hi Yuxia,

Thanks for the proposal. Many engines like Snowflake, Databricks support
it. +1

"3:Check the atomicity is enabled, it requires both the options
table.rtas-ctas.atomicity-enabled is set to true and the corresponding
table sink implementation SupportsStaging."

Typo? "Option" instead of "options"? It sounds like there are more options
that need to be set.

Best regards,
jing




On Tue, Jun 27, 2023 at 8:37 AM yuxia  wrote:

> Hi, all.
> Thanks for the feedback.
>
> If there are no other questions or concerns for the FLIP[1], I'd like to
> start the vote tomorrow (6.28).
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
>
> Best regards,
> Yuxia
>
>
> 发件人: "zhangmang1" 
> 收件人: "dev" , luoyu...@alumni.sjtu.edu.cn
> 发送时间: 星期二, 2023年 6 月 27日 下午 12:03:35
> 主题: Re:Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement
>
> Hi yuxia,
>
> +1 for this new feature.
> In particular, the CREATE OR REPLACE TABLE syntax is more usable and
> faster for users.
>
>
>
>
>
> --
> Best regards,
> Mang Zhang
>
>
>
>
> At 2023-06-26 09:46:40, "yuxia"  wrote:
> >Hi, folks.
> >To save the time of reviewers, I would like to summary the main changes
> of this FLIP[1]. The FLIP is just to introduce REPLACE TABLE AS SELECT
> statement which is almost similar to CREATE TABLE AS SELECT statement, and
> a syntax CREATE OR REPLACE TABLE AS to wrap both. This FLIP is try to
> complete such kinds of statement.
> >
> >The changes are as follows:
> >1: Add enum REPLACE_TABLE_AS, CREATE_OR_REPLACE_TABLE_AS in
> StagingPurpose which is proposed in FLIP-305[2].
> >
> >2: Change the configuration from `table.ctas.atomicity-enabled` proposed
> in FLIP-305[2] to `table.rtas-ctas.atomicity-enabled` to make it take
> effect not only for create table as, but for replace table as && create or
> replace table as. The main reason is that these statements are almost same
> which belongs to same statement family and I would not like to introduce a
> new different configuration which actually do the same thing. Also, IIRC,
> in the offline dicussion about FLIP-218[1], it also wants to introduce
> `table.rtas-ctas.atomicity-enabled`, but as FLIP-218 is only to support
> CTAS, it's not suitable to introduce a configuration implying rtas which is
> not supported. So, we change the configuration to
> `table.ctas.atomicity-enabled`. Since CTAS has been supported, I think it's
> reasonable to revist it and introduce `table.rtas-ctas.atomicity-enabled` a
> to unify them in this FLIP for supporting REPLACE TABLE AS statement.
> >
> >
> >Again, look forward to your feedback.
> >
> >[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> >[2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >[3]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185
> >
> >Best regards,
> >Yuxia
> >
> >- 原始邮件 -
> >发件人: "yuxia" 
> >收件人: "dev" 
> >发送时间: 星期四, 2023年 6 月 15日 下午 7:58:27
> >主题: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement
> >
> >Hi, devs.
> >As the FLIPs FLIP-218[1] & FLIP-305[2] for Flink to supports CREATE TABLE
> AS SELECT statement has been accepted.
> >I would like to start a discussion about FLIP-303: Support REPLACE TABLE
> AS SELECT+statement[3] to complete such kinds of statements.
> >With REPLACE TABLE AS SELECT statement, users won't need to drop the
> table firstly, and use CREATE TABLE AS SELECT then. Since the statement is
> much similar to CREATE TABLE AS statement, the design is much similar to
> FLIP-218[1] & FLIP-305[2] apart from some parts specified to REPLACE TABLE
> AS SELECT statement.
> >Just kindly remind, to understand this FLIP better, you may need read
> FLIP-218[1] & FLIP-305[2] to get more context.
> >
> >Look forward to your feedback.
> >
> >[1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185
> >[2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >[3]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> >
> >:) just notice I miss "[DISCUSS]" in the title of the previous email [4],
> so I send it again here with the correct email title. Please ignore the
> previous email and discuss in this thread.
> >Sorry for the noise.
> >
> >[4]: https://lists.apache.org/thread/jy39xwxn1o2035y5411xynwtbyfgg76t
> >
> >
> >Best regards,
> >Yuxia
>
>


[jira] [Created] (FLINK-32445) BlobStore.closeAndCleanupAllData doesn't do any close action

2023-06-27 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32445:
-

 Summary: BlobStore.closeAndCleanupAllData doesn't do any close 
action
 Key: FLINK-32445
 URL: https://issues.apache.org/jira/browse/FLINK-32445
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Matthias Pohl


We might want to refactor {{BlobStore.closeAndCleanupAllData}}: It doesn't 
close any resources (and doesn't need to). Therefore, renaming the interfaces 
method to {{cleanAllData}} seems to be more appropriate.

This enables us to remove redundant code in 
{{AbstractHaServices.closeAndCleanupAllData}} and {{AbstractHaServices.close}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jingsong Li
Looks good to me!

Thanks Dong, Yunfeng and all for your discussion and design.

Best,
Jingsong

On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
>
> Thank you Dong for driving this FLIP.
>
> The new design looks good to me!
>
> Best,
> Jark
>
> > 2023年6月27日 14:38,Dong Lin  写道:
> >
> > Thank you Leonard for the review!
> >
> > Hi Piotr, do you have any comments on the latest proposal?
> >
> > I am wondering if it is OK to start the voting thread this week.
> >
> > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> >
> >> Thanks Dong for driving this FLIP forward!
> >>
> >> Introducing  `backlog status` concept for flink job makes sense to me as
> >> following reasons:
> >>
> >> From concept/API design perspective, it’s more general and natural than
> >> above proposals as it can be used in HybridSource for bounded records, CDC
> >> Source for history snapshot and general sources like KafkaSource for
> >> historical messages.
> >>
> >> From user cases/requirements, I’ve seen many users manually to set larger
> >> checkpoint interval during backfilling and then set a shorter checkpoint
> >> interval for real-time processing in their production environments as a
> >> flink application optimization. Now, the flink framework can make this
> >> optimization no longer require the user to set the checkpoint interval and
> >> restart the job multiple times.
> >>
> >> Following supporting using larger checkpoint for job under backlog status
> >> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> >> for job under backlog status in the future.
> >>
> >> In short, the updated FLIP looks good to me.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> Thanks again for proposing the isProcessingBacklog concept.
> >>>
> >>> After discussing with Becket Qin and thinking about this more, I agree it
> >>> is a better idea to add a top-level concept to all source operators to
> >>> address the target use-case.
> >>>
> >>> The main reason that changed my mind is that isProcessingBacklog can be
> >>> described as an inherent/nature attribute of every source instance and
> >> its
> >>> semantics does not need to depend on any specific checkpointing policy.
> >>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
> >>> have considered so far (e.g. HybridSource and MySQL CDC source) without
> >>> asking users to explicitly configure the per-source behavior, which
> >> indeed
> >>> provides better user experience.
> >>>
> >>> I have updated the FLIP based on the latest suggestions. The latest FLIP
> >> no
> >>> longer introduces per-source config that can be used by end-users. While
> >> I
> >>> agree with you that CheckpointTrigger can be a useful feature to address
> >>> additional use-cases, I am not sure it is necessary for the use-case
> >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> >>> in another FLIP?
> >>>
> >>> Can you help take another look at the updated FLIP?
> >>>
> >>> Best,
> >>> Dong
> >>>
> >>>
> >>>
> >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> >>> wrote:
> >>>
>  Hi Dong,
> 
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
>  any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> >> the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> >> 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
> 
>  Sorry I don't understand where you are getting those numbers from.
>  Instead of trying to find loophole after loophole, could you try to
> >> think
>  how a given loophole could be improved/solved?
> 
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
> 
>  Please propose something. I don't think it's needed.
> 
> > - For the use-case mentioned in FLIP-309 motivation section, would the
>  APIs
> > of this alternative approach be more or less usable?
> 
>  Everything that you originally wanted to achieve in FLIP-309, you could
> >> do
>  as well in my proposal.
>  Vide my many mentions of the "hacky solution".
> 
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
> 
>  I don't see why not.
> 
> > - Can these APIs be decoupled from the APIs currently proposed in
>  FLIP-309?
> 
>  Yes
> 
> > For example, if the APIs of this alternative approach can be decoupled

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jark Wu
Thank you Dong for driving this FLIP. 

The new design looks good to me!

Best,
Jark

> 2023年6月27日 14:38,Dong Lin  写道:
> 
> Thank you Leonard for the review!
> 
> Hi Piotr, do you have any comments on the latest proposal?
> 
> I am wondering if it is OK to start the voting thread this week.
> 
> On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> 
>> Thanks Dong for driving this FLIP forward!
>> 
>> Introducing  `backlog status` concept for flink job makes sense to me as
>> following reasons:
>> 
>> From concept/API design perspective, it’s more general and natural than
>> above proposals as it can be used in HybridSource for bounded records, CDC
>> Source for history snapshot and general sources like KafkaSource for
>> historical messages.
>> 
>> From user cases/requirements, I’ve seen many users manually to set larger
>> checkpoint interval during backfilling and then set a shorter checkpoint
>> interval for real-time processing in their production environments as a
>> flink application optimization. Now, the flink framework can make this
>> optimization no longer require the user to set the checkpoint interval and
>> restart the job multiple times.
>> 
>> Following supporting using larger checkpoint for job under backlog status
>> in current FLIP, we can explore supporting larger parallelism/memory/cpu
>> for job under backlog status in the future.
>> 
>> In short, the updated FLIP looks good to me.
>> 
>> 
>> Best,
>> Leonard
>> 
>> 
>>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks again for proposing the isProcessingBacklog concept.
>>> 
>>> After discussing with Becket Qin and thinking about this more, I agree it
>>> is a better idea to add a top-level concept to all source operators to
>>> address the target use-case.
>>> 
>>> The main reason that changed my mind is that isProcessingBacklog can be
>>> described as an inherent/nature attribute of every source instance and
>> its
>>> semantics does not need to depend on any specific checkpointing policy.
>>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
>>> have considered so far (e.g. HybridSource and MySQL CDC source) without
>>> asking users to explicitly configure the per-source behavior, which
>> indeed
>>> provides better user experience.
>>> 
>>> I have updated the FLIP based on the latest suggestions. The latest FLIP
>> no
>>> longer introduces per-source config that can be used by end-users. While
>> I
>>> agree with you that CheckpointTrigger can be a useful feature to address
>>> additional use-cases, I am not sure it is necessary for the use-case
>>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
>>> in another FLIP?
>>> 
>>> Can you help take another look at the updated FLIP?
>>> 
>>> Best,
>>> Dong
>>> 
>>> 
>>> 
>>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
>>> wrote:
>>> 
 Hi Dong,
 
> Suppose there are 1000 subtask and each subtask has 1% chance of being
> "backpressured" at a given time (due to random traffic spikes). Then at
 any
> given time, the chance of the job
> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
>> the
> backpressure metric once a second, the estimated time for the job
> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
>> 23163
> sec = 6.4 hours.
> 
> This means that the job will effectively always use the longer
> checkpointing interval. It looks like a real concern, right?
 
 Sorry I don't understand where you are getting those numbers from.
 Instead of trying to find loophole after loophole, could you try to
>> think
 how a given loophole could be improved/solved?
 
> Hmm... I honestly think it will be useful to know the APIs due to the
> following reasons.
 
 Please propose something. I don't think it's needed.
 
> - For the use-case mentioned in FLIP-309 motivation section, would the
 APIs
> of this alternative approach be more or less usable?
 
 Everything that you originally wanted to achieve in FLIP-309, you could
>> do
 as well in my proposal.
 Vide my many mentions of the "hacky solution".
 
> - Can these APIs reliably address the extra use-case (e.g. allow
> checkpointing interval to change dynamically even during the unbounded
> phase) as it claims?
 
 I don't see why not.
 
> - Can these APIs be decoupled from the APIs currently proposed in
 FLIP-309?
 
 Yes
 
> For example, if the APIs of this alternative approach can be decoupled
 from
> the APIs currently proposed in FLIP-309, then it might be reasonable to
> work on this extra use-case with a more advanced/complicated design
> separately in a followup work.
 
 As I voiced my concerns previously, the current design of FLIP-309 would
 clog the public API and in the long run confuse the users. IMO It's
 

Re: [VOTE] Release flink-connector-jdbc v3.1.1, release candidate #2

2023-06-27 Thread Martijn Visser
+1 (binding)

- Validated hashes
- Verified signature
- Verified that no binaries exist in the source archive
- Build the source with Maven
- Verified licenses
- Verified web PRs

On Tue, Jun 27, 2023 at 1:42 AM Sergey Nuyanzin  wrote:

> +1 (non-binding)
>
> - verified hashes
> - verified signatures
> - built from sources
> - checked release notes
> - review web pr
>
> Non-blocking finding:
>   it seems the release date in web PR should be changed
>
> On Mon, Jun 26, 2023 at 1:34 PM Leonard Xu  wrote:
>
> > +1 (binding)
> >
> > - built from source code succeeded
> > - verified signatures
> > - verified hashsums
> > - checked release notes
> > - checked the contents contains jar and pom files in apache repo
> > - reviewed the web PR
> >
> > Best,
> > Leonard
> >
> > > On Jun 19, 2023, at 6:57 PM, Danny Cranmer 
> > wrote:
> > >
> > > Thanks for driving this Martijn.
> > >
> > > +1 (binding)
> > >
> > > - Reviewed web PR
> > > - Jira release notes look good
> > > - Tag exists in Github
> > > - Source archive signature/checksum looks good
> > > - Binary (from Maven) signature/checksum looks good
> > > - No binaries in the source archive
> > > - Source archive builds from source and tests pass
> > > - CI passes [1]
> > >
> > > Non blocking findings:
> > > - NOTICE files year is 2022 and needs to be updated to 2023
> > > - pom.xml is referencing Flink 1.17.0 and can be updated to 1.17.1
> > > - Some unit tests (notably OracleExactlyOnceSinkE2eTest) appear to be
> > > integration/e2e and are run in the unit test suite
> > >
> > > Thanks,
> > > Danny
> > >
> > > [1]
> > https://github.com/apache/flink-connector-jdbc/actions/runs/5278297177
> > >
> > > On Thu, Jun 15, 2023 at 12:40 PM Martijn Visser <
> > martijnvis...@apache.org>
> > > wrote:
> > >
> > >> Hi everyone,
> > >> Please review and vote on the release candidate #2 for the version
> > 3.1.1,
> > >> as follows:
> > >> [ ] +1, Approve the release
> > >> [ ] -1, Do not approve the release (please provide specific comments)
> > >>
> > >>
> > >> The complete staging area is available for your review, which
> includes:
> > >> * JIRA release notes [1],
> > >> * the official Apache source release to be deployed to
> dist.apache.org
> > >> [2],
> > >> which are signed with the key with fingerprint
> > >> A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > >> * all artifacts to be deployed to the Maven Central Repository [4],
> > >> * source code tag v3.1.1-rc2 [5],
> > >> * website pull request listing the new release [6].
> > >>
> > >> The vote will be open for at least 72 hours. It is adopted by majority
> > >> approval, with at least 3 PMC affirmative votes.
> > >>
> > >> Thanks,
> > >> Release Manager
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353281
> > >> [2]
> > >>
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.1-rc2
> > >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > >> [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1642
> > >> [5] https://github.com/apache/flink-connector-
> > >> /releases/tag/v3.1.1-rc2
> > >> [6] https://github.com/apache/flink-web/pull/654
> > >>
> >
> >
>
> --
> Best regards,
> Sergey
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thank you Leonard for the review!

Hi Piotr, do you have any comments on the latest proposal?

I am wondering if it is OK to start the voting thread this week.

On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:

> Thanks Dong for driving this FLIP forward!
>
> Introducing  `backlog status` concept for flink job makes sense to me as
> following reasons:
>
> From concept/API design perspective, it’s more general and natural than
> above proposals as it can be used in HybridSource for bounded records, CDC
> Source for history snapshot and general sources like KafkaSource for
> historical messages.
>
> From user cases/requirements, I’ve seen many users manually to set larger
> checkpoint interval during backfilling and then set a shorter checkpoint
> interval for real-time processing in their production environments as a
> flink application optimization. Now, the flink framework can make this
> optimization no longer require the user to set the checkpoint interval and
> restart the job multiple times.
>
> Following supporting using larger checkpoint for job under backlog status
> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> for job under backlog status in the future.
>
> In short, the updated FLIP looks good to me.
>
>
> Best,
> Leonard
>
>
> > On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> >
> > Hi Piotr,
> >
> > Thanks again for proposing the isProcessingBacklog concept.
> >
> > After discussing with Becket Qin and thinking about this more, I agree it
> > is a better idea to add a top-level concept to all source operators to
> > address the target use-case.
> >
> > The main reason that changed my mind is that isProcessingBacklog can be
> > described as an inherent/nature attribute of every source instance and
> its
> > semantics does not need to depend on any specific checkpointing policy.
> > Also, we can hardcode the isProcessingBacklog behavior for the sources we
> > have considered so far (e.g. HybridSource and MySQL CDC source) without
> > asking users to explicitly configure the per-source behavior, which
> indeed
> > provides better user experience.
> >
> > I have updated the FLIP based on the latest suggestions. The latest FLIP
> no
> > longer introduces per-source config that can be used by end-users. While
> I
> > agree with you that CheckpointTrigger can be a useful feature to address
> > additional use-cases, I am not sure it is necessary for the use-case
> > targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> > in another FLIP?
> >
> > Can you help take another look at the updated FLIP?
> >
> > Best,
> > Dong
> >
> >
> >
> > On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Dong,
> >>
> >>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>> "backpressured" at a given time (due to random traffic spikes). Then at
> >> any
> >>> given time, the chance of the job
> >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> the
> >>> backpressure metric once a second, the estimated time for the job
> >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> 23163
> >>> sec = 6.4 hours.
> >>>
> >>> This means that the job will effectively always use the longer
> >>> checkpointing interval. It looks like a real concern, right?
> >>
> >> Sorry I don't understand where you are getting those numbers from.
> >> Instead of trying to find loophole after loophole, could you try to
> think
> >> how a given loophole could be improved/solved?
> >>
> >>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>> following reasons.
> >>
> >> Please propose something. I don't think it's needed.
> >>
> >>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >> APIs
> >>> of this alternative approach be more or less usable?
> >>
> >> Everything that you originally wanted to achieve in FLIP-309, you could
> do
> >> as well in my proposal.
> >> Vide my many mentions of the "hacky solution".
> >>
> >>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>> checkpointing interval to change dynamically even during the unbounded
> >>> phase) as it claims?
> >>
> >> I don't see why not.
> >>
> >>> - Can these APIs be decoupled from the APIs currently proposed in
> >> FLIP-309?
> >>
> >> Yes
> >>
> >>> For example, if the APIs of this alternative approach can be decoupled
> >> from
> >>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>> work on this extra use-case with a more advanced/complicated design
> >>> separately in a followup work.
> >>
> >> As I voiced my concerns previously, the current design of FLIP-309 would
> >> clog the public API and in the long run confuse the users. IMO It's
> >> addressing the
> >> problem in the wrong place.
> >>
> >>> Hmm.. do you mean we can do the following:
> >>> - Have all source operators emit a metric named "processingBacklog".
> >>> - Add a job-level config that 

Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-06-27 Thread yuxia
Hi, all. 
Thanks for the feedback. 

If there are no other questions or concerns for the FLIP[1], I'd like to start 
the vote tomorrow (6.28). 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
 

Best regards, 
Yuxia 


发件人: "zhangmang1"  
收件人: "dev" , luoyu...@alumni.sjtu.edu.cn 
发送时间: 星期二, 2023年 6 月 27日 下午 12:03:35 
主题: Re:Re: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement 

Hi yuxia, 

+1 for this new feature. 
In particular, the CREATE OR REPLACE TABLE syntax is more usable and faster for 
users. 





-- 
Best regards, 
Mang Zhang 




At 2023-06-26 09:46:40, "yuxia"  wrote:
>Hi, folks.
>To save the time of reviewers, I would like to summary the main changes of 
>this FLIP[1]. The FLIP is just to introduce REPLACE TABLE AS SELECT statement 
>which is almost similar to CREATE TABLE AS SELECT statement, and a syntax 
>CREATE OR REPLACE TABLE AS to wrap both. This FLIP is try to complete such 
>kinds of statement. 
>
>The changes are as follows:
>1: Add enum REPLACE_TABLE_AS, CREATE_OR_REPLACE_TABLE_AS in StagingPurpose 
>which is proposed in FLIP-305[2].
>
>2: Change the configuration from `table.ctas.atomicity-enabled` proposed in 
>FLIP-305[2] to `table.rtas-ctas.atomicity-enabled` to make it take effect not 
>only for create table as, but for replace table as && create or replace table 
>as. The main reason is that these statements are almost same which belongs to 
>same statement family and I would not like to introduce a new different 
>configuration which actually do the same thing. Also, IIRC, in the offline 
>dicussion about FLIP-218[1], it also wants to introduce 
>`table.rtas-ctas.atomicity-enabled`, but as FLIP-218 is only to support CTAS, 
>it's not suitable to introduce a configuration implying rtas which is not 
>supported. So, we change the configuration to `table.ctas.atomicity-enabled`. 
>Since CTAS has been supported, I think it's reasonable to revist it and 
>introduce `table.rtas-ctas.atomicity-enabled` a to unify them in this FLIP for 
>supporting REPLACE TABLE AS statement.
>
>
>Again, look forward to your feedback.
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> 
>[2] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> 
>[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185 
>
>Best regards,
>Yuxia
>
>- 原始邮件 -
>发件人: "yuxia" 
>收件人: "dev" 
>发送时间: 星期四, 2023年 6 月 15日 下午 7:58:27
>主题: [DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement
>
>Hi, devs. 
>As the FLIPs FLIP-218[1] & FLIP-305[2] for Flink to supports CREATE TABLE AS 
>SELECT statement has been accepted. 
>I would like to start a discussion about FLIP-303: Support REPLACE TABLE AS 
>SELECT+statement[3] to complete such kinds of statements. 
>With REPLACE TABLE AS SELECT statement, users won't need to drop the table 
>firstly, and use CREATE TABLE AS SELECT then. Since the statement is much 
>similar to CREATE TABLE AS statement, the design is much similar to 
>FLIP-218[1] & FLIP-305[2] apart from some parts specified to REPLACE TABLE AS 
>SELECT statement. 
>Just kindly remind, to understand this FLIP better, you may need read 
>FLIP-218[1] & FLIP-305[2] to get more context. 
>
>Look forward to your feedback. 
>
>[1]: 
>https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185 
>[2]: 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> 
>[3]: 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
> 
>
>:) just notice I miss "[DISCUSS]" in the title of the previous email [4], so I 
>send it again here with the correct email title. Please ignore the previous 
>email and discuss in this thread. 
>Sorry for the noise. 
>
>[4]: https://lists.apache.org/thread/jy39xwxn1o2035y5411xynwtbyfgg76t 
>
>
>Best regards, 
>Yuxia 



[jira] [Created] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-06-27 Thread Jark Wu (Jira)
Jark Wu created FLINK-32444:
---

 Summary: Enable object reuse for Flink SQL jobs by default
 Key: FLINK-32444
 URL: https://issues.apache.org/jira/browse/FLINK-32444
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.18.0


Currently, object reuse is not enabled by default for Flink Streaming Jobs, but 
is enabled by default for Flink Batch jobs. That is not consistent for 
stream-batch unification. Besides, SQL operators are safe to enable object 
reuse and this is a great performance improvement for SQL jobs. 

We should also be careful with the Table-DataStream conversion case 
(StreamTableEnvironment) which is not safe to enable object reuse by default. 
Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)