[jira] [Created] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-15 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-31483:
---

 Summary: Implement Split Deletion Support in Flink Kafka Connector
 Key: FLINK-31483
 URL: https://issues.apache.org/jira/browse/FLINK-31483
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka, Connectors / Parent
Reporter: Ruibin Xing


Currently, the Flink Kafka Connector does not support split deletion and is 
left as a 
[TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
 I want to add this feature by doing these steps:

1. Add SplitsDeletion event to flink-connector-base, which currently only has 
SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
SplitsDeletion event to the source operator. To maintain compatibility, a 
default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including 
remove them from Split state and stopping SourceReader from reading the deleted 
splits.

As an alternative, without modifying the flink-connector-base, 
KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
splits deletion and deal with it in the kafka-connector-specific code. But I 
think it's better to have SplitsDeletion in flink-connector-base, so other 
connectors can use it too.

Let me know if you have any thoughts or ideas. Thanks!

Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31482) support count jobmanager-failed failover times

2023-03-15 Thread Fei Feng (Jira)
Fei Feng created FLINK-31482:


 Summary: support count jobmanager-failed failover times
 Key: FLINK-31482
 URL: https://issues.apache.org/jira/browse/FLINK-31482
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Metrics
Affects Versions: 1.16.1
Reporter: Fei Feng


we have a  metric `numRestarts` which indicate how many times a job failover , 
but we don't have a metric indicate the job recover from ha ( high 
availability).

there are two problems:

1. when a  jobmanager process crashed , we have no way of knowing that 
jobmanager is crash and job was recovered from metric system 

2. when a new jobmanager become leader, the  `numRestarts`  will started from 
zero, 
Sometimes misleading our users。most user think that whether failover because of 
a JM failure or because of a job failure, these failover is same , the effect, 
at least, is the same.
 
I suggest we can 
1. add new metric that indicate how many time the job was recovered from ha
2. metric `numRestarts` also count the times recover from ha  
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31481) Add filter support for ShowDatabases

2023-03-15 Thread Ran Tao (Jira)
Ran Tao created FLINK-31481:
---

 Summary: Add filter support for ShowDatabases
 Key: FLINK-31481
 URL: https://issues.apache.org/jira/browse/FLINK-31481
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Reporter: Ran Tao


As FLIP discussed. To avoid bloat, this ticket supports ShowDatabases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31480) Fix Typo in YarnClusterDescriptor

2023-03-15 Thread Shilun Fan (Jira)
Shilun Fan created FLINK-31480:
--

 Summary: Fix Typo in YarnClusterDescriptor
 Key: FLINK-31480
 URL: https://issues.apache.org/jira/browse/FLINK-31480
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.17.1
Reporter: Shilun Fan


There is a typo in the comment for YarnClusterDescriptor, this jira will fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-15 Thread Yuxin Tan
Hi, Yun

Thanks for your suggestions.

> I think you could describe it explicitly in the original FLIP's goals or
design principles.

I have updated the FLIP and given a more detailed description of the
tier decoupling design.

Best,
Yuxin


Yun Tang  于2023年3月15日周三 20:46写道:

> Hi Yuxin,
>
> Thanks for your explanations.
> I think the kernel idea is that you prefer the simple and decoupling
> design for the 1st version of hybrid shuffle with remote storage. If
> following this idea, perhaps I could accept your current explanations and I
> think you could describe it explicitly in the original FLIP's goals or
> design principles.
>
>
> Best
> Yun Tang
> 
> From: Yuxin Tan 
> Sent: Wednesday, March 15, 2023 12:41
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi, Yun,
>
> Thanks for sharing the ideas.
>
> > 1. We should trigger to kick the shuffle data to remote storage
> once either condition is reached
>
> I believe that configuring two options in this manner is a pragmatic
> approach that can fulfill a wider range of usage scenarios. However,
> if we present two options, it may become difficult to remove them
> in the future once users have started relying on them. On the other
> hand, if we introduce a single option, we can easily incorporate
> additional options based on your recommendations if required.
> Thus, we recommend adopting a one-option solution in the first
> version to address the issue.
>
> > 2. Perhaps we could switch to kick shuffle data to remote storage
> once no space left exception is met.
>
> Thanks for your valuable feedback. While the suggestion is a viable
> solution to address the no space left exception issue, we are
> concerned that implementing it could create interdependence between
> the disk tier and remote storage tier, which would contradict our goal
> of achieving independence between tiers in the new architecture.
> Moreover, we believe that it is better to prevent encountering the
> exception in the first place by reserving adequate disk space. This is
> because other processes on the same machine may also be impacted
> when the exception occurs. If the exception does still arise, we can
> explore other potential solutions through detailed design and
> discussions, such as the one you proposed, optimizing the reserved
> space with a global counter of TM, etc. Although the current implementation
> only partially addresses the exception issue, we expect to improve it
> in subsequent versions due to the complexity of this FLIP. We would
> appreciate hearing your thoughts on this matter.
>
> Best,
> Yuxin
>
>
> Yun Tang  于2023年3月14日周二 14:48写道:
>
> > Hi Yuxin
> >
> > Thanks for your reply.
> > I am not saying that we should use an absolute reserved value to replace
> > the current plan of the reserved fraction. We should trigger to kick the
> > shuffle data to remote storage once either condition is reached.
> > Maybe we could get some idea from the configuration of tiered based state
> > backend [1].
> >
> > The concern about concurrent writing is due to the local disk being
> shared
> > by all the instances running on that node, we cannot ensure other
> > components would not flush data during shuffle writing. Perhaps we could
> > switch to kick shuffle data to remote storage once no space left
> exception
> > is met.
> >
> >
> > [1]
> >
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
> >
> >
> > Best
> > Yun Tang
> >
> > 
> > From: Yuxin Tan 
> > Sent: Monday, March 13, 2023 15:06
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
> >
> > Hi,
> > Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
> >
> > @Zhu Zhu
> > > 1. I'm a bit concerned about the cost of it.
> >
> > The Read Client's request to check the existence of a segment in each
> > storage tier is the netty message, which is similar to the credit
> updating
> > messages. It is observed that the netty message cost for these operations
> > is relatively low and the number of messages is also relatively low
> > compared to credit updates which are sent every few buffers. Moreover,
> > since this request involves only memory operations, the message cost
> > is significantly low during the total reading data process.
> >
> > And we will further optimize the message cost later, particularly for
> > segments that remain in the same tier without switching tiers. That
> > is, for consecutive segments in the same tier, we can continue to send
> > the next segment without waiting for the downstream to ask whether the
> > segment exists in this tier.
> >
> >
> > @Xia Sun
> > > 1. how to keep the order of segments between different storage tiers
> >
> > To indicate the sequential order of upstream segments, we rely on
> segmentId
> > downstr

Re: [VOTE] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-15 Thread Jane Chan
+1 (non-binding)

Best,
Jane

On Thu, Mar 16, 2023 at 7:48 AM Benchao Li  wrote:

> +1 (binding)
>
> Dong Lin  于2023年3月15日周三 23:24写道:
>
> > +1 (binding)
> >
> > On Mon, Mar 13, 2023 at 8:18 PM Lincoln Lee 
> > wrote:
> >
> > > Dear Flink developers,
> > >
> > > Thanks for all your feedback for FLIP-300: Add targetColumns to
> > > DynamicTableSink#Context to solve the null overwrite problem of
> > > partial-insert[1] on the discussion thread[2].
> > >
> > > I'd like to start a vote for it. The vote will be open for at least 72
> > > hours unless there is an objection or not enough votes.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> > > [2] https://lists.apache.org/thread/bk8x0nqg4oc62jqryj9ntzzlpj062wd9
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


[jira] [Created] (FLINK-31479) Close blocking iterators in tests

2023-03-15 Thread Guojun Li (Jira)
Guojun Li created FLINK-31479:
-

 Summary: Close blocking iterators in tests
 Key: FLINK-31479
 URL: https://issues.apache.org/jira/browse/FLINK-31479
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: Guojun Li


Several blocking iterators are not closed in `ContinuousFileStoreITCase`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31478) TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream

2023-03-15 Thread Dian Fu (Jira)
Dian Fu created FLINK-31478:
---

 Summary: TypeError: a bytes-like object is required, not 
'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
 Key: FLINK-31478
 URL: https://issues.apache.org/jira/browse/FLINK-31478
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


{code}

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import (FileSource, 
StreamFormat, FileSink,
   OutputFileConfig, 
RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,",
   "And by opposing end them?--To die,--to sleep,--",
   "No more; and by a sleep to say we end",
   "The heartache, and the thousand natural shocks",
   "That flesh is heir to,--'tis a consummation",
   "Devoutly to be wish'd. To die,--to sleep;--",
   "To sleep! perchance to dream:--ay, there's the rub;",
   "For in that sleep of death what dreams may come,",
   "When we have shuffled off this mortal coil,",
   "Must give us pause: there's the respect",
   "That makes calamity of so long life;",
   "For who would bear the whips and scorns of time,",
   "The oppressor's wrong, the proud man's contumely,",
   "The pangs of despis'd love, the law's delay,",
   "The insolence of office, and the spurns",
   "That patient merit of the unworthy takes,",
   "When he himself might his quietus make",
   "With a bare bodkin? who would these fardels bear,",
   "To grunt and sweat under a weary life,",
   "But that the dread of something after death,--",
   "The undiscover'd country, from whose bourn",
   "No traveller returns,--puzzles the will,",
   "And makes us rather bear those ills we have",
   "Than fly to others that we know not of?",
   "Thus conscience does make cowards of us all;",
   "And thus the native hue of resolution",
   "Is sicklied o'er with the pale cast of thought;",
   "And enterprises of great pith and moment,",
   "With this regard, their currents turn awry,",
   "And lose the name of action.--Soft you now!",
   "The fair Ophelia!--Nymph, in thy orisons",
   "Be all my sins remember'd."]


def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)

# define the source
if input_path is not None:
ds = env.from_source(

source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
   input_path)
 .process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
ds = env.from_collection(word_count_data)

def split(line):
yield from line.split()

# compute

Re: [VOTE] FLIP-293: Introduce Flink Jdbc Driver For Sql Gateway

2023-03-15 Thread ConradJam
Hang Ruan ruanhang1...@gmail.com 通过
“flink.apache.org”
3月15日周三 17:09 (17小时前)
发送至 dev
英语
中文

翻译邮件
+1 (non-binding)

Samrat Deb  于2023年3月15日周三 23:29写道:

> +1 ( non binding)
>
> Bests,
> Samrat
>
> On Wed, 15 Mar 2023 at 8:54 PM, Dong Lin  wrote:
>
> > +1 (binding)
> >
> > On Mon, Mar 13, 2023 at 1:47 PM Shammon FY  wrote:
> >
> > > Hi Devs,
> > >
> > > I'd like to start the vote on FLIP-293: Introduce Flink Jdbc Driver For
> > Sql
> > > Gateway [1].
> > >
> > > The FLIP was discussed in thread [2], and it aims to introduce Flink
> Jdbc
> > > Driver module in Flink.
> > >
> > > The vote will last for at least 72 hours (03/16, 15:00 UTC+8) unless
> > there
> > > is an objection or insufficient vote. Thank you all.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway
> > > [2] https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
> > >
> > >
> > > Best,
> > > Shammon.FY
> > >
> >
>


-- 
Best

ConradJam


Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-15 Thread ConradJam
Thanks for your start this discuss


I have been tracking this problem for a long time, until I saw a
conversation in ISSUSE a few days ago and learned that the Kryo version
problem will affect the JDK17 compilation of snapshots [1] FLINK-24998 ,

As @cherry said it ruined our whole effort towards JDK17

I am in favor of providing an external tool to migrate from Kryo old
version checkpoint to the new Kryo new checkpoint at one time (Maybe this
tool start in flink 2.0 ?), does this tool currently have any plans or
ideas worth discuss


I think it should not be difficult to be compatible with JDK11 and JDK17.
We should indeed abandon JDK8 in 2.0.0. It is also mentioned in the doc
that it is marked as Deprecated [2]


Here I add that we need to pay attention to the version of Scala and the
version of JDK17


[1] FLINK-24998  IGSEGV in Kryo / C2 CompilerThread on Java 17
https://issues.apache.org/jira/browse/FLINK-24998

[2] FLINK-30501 Update Flink build instruction to deprecate Java 8 instead
of requiring Java 11  https://issues.apache.org/jira/browse/FLINK-30501

Tamir Sagi  于2023年3月16日周四 00:54写道:

> Hey dev community,
>
> I'm writing this email to kick off a discussion following this epic:
> FLINK-15736.
>
> We are moving towards JDK 17 (LTS) , the only blocker now is Flink which
> currently remains on JDK 11 (LTS). Flink does not support JDK 17 yet, with
> no timeline,  the reason, based on the aforementioned ticket is the
> following tickets
>
>   1.  FLINK-24998 - SIGSEGV in Kryo / C2 CompilerThread on Java 17<
> https://issues.apache.org/jira/browse/FLINK-24998>.
>   2.  FLINK-3154 - Update Kryo version from 2.24.0 to latest Kryo LTS
> version
>
> My question is whether it is possible to release a major version (Flink
> 2.0.0) using the latest Kryo version for those who don't need to restore
> old savepoints/checkpoints in newer format.
>
>   1.  Leverage JDK 17 features within JVM
>   2.  Moving from the old format to the newer one will be handled only
> once - a mitigation can be achieved by a conversion tool or external
> serializers, both can be provided later on.
>
> I'd like to emphasize that the next JDK LTS (21) will be released this
> September.  furthermore, Flink already supports JDK 12-15, which is very
> close to JDK 17 (LTS) - that was released in September 2021.  JDK 11 will
> become a legacy soon, as more frameworks moving towards JDK 17 and are less
> likely to support JDK 11 in the near future. (For example, Spring Boot 3
> requires JDK 17 already).
>
> Thank you for your consideration of my request.
>
> Tamir.
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


-- 
Best

ConradJam


[jira] [Created] (FLINK-31477) NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on azure

2023-03-15 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-31477:
--

 Summary: NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on 
azure 
 Key: FLINK-31477
 URL: https://issues.apache.org/jira/browse/FLINK-31477
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: Leonard Xu
 Fix For: 1.16.2


{noformat}
 Failures: 
Mar 15 15:52:32 [ERROR]   NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 
optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, f], 
build=[left])
Mar 15 15:52:32:- Exchange(distribution=[broadcast])
Mar 15 15:52:32:  +- Calc(select=[a], where=[(a = 10)])
Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
Mar 15 15:52:32+- Calc(select=[e, f], where=[(d = 10])])
Mar 15 15:52:32   +- LegacyT...> but was:<...[InnerJoin], where=[[(a = d)], 
select=[a, d, e, f], build=[left])
Mar 15 15:52:32:- Exchange(distribution=[broadcast])
Mar 15 15:52:32:  +- Calc(select=[a], where=[SEARCH(a, Sarg[10])])
Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
Mar 15 15:52:32+- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])])
Mar 15 15:52:32   +- LegacyT...>{noformat}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47202&view=logs&j=086353db-23b2-5446-2315-18e660618ef2&t=6cd785f3-2a2e-58a8-8e69-b4a03be28843



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-15 Thread Benchao Li
+1 (binding)

Dong Lin  于2023年3月15日周三 23:24写道:

> +1 (binding)
>
> On Mon, Mar 13, 2023 at 8:18 PM Lincoln Lee 
> wrote:
>
> > Dear Flink developers,
> >
> > Thanks for all your feedback for FLIP-300: Add targetColumns to
> > DynamicTableSink#Context to solve the null overwrite problem of
> > partial-insert[1] on the discussion thread[2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours unless there is an objection or not enough votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> > [2] https://lists.apache.org/thread/bk8x0nqg4oc62jqryj9ntzzlpj062wd9
> >
> >
> > Best,
> > Lincoln Lee
> >
>


-- 

Best,
Benchao Li


[Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-15 Thread Tamir Sagi
Hey dev community,

I'm writing this email to kick off a discussion following this epic: 
FLINK-15736.

We are moving towards JDK 17 (LTS) , the only blocker now is Flink which 
currently remains on JDK 11 (LTS). Flink does not support JDK 17 yet, with no 
timeline,  the reason, based on the aforementioned ticket is the following 
tickets

  1.  FLINK-24998 - SIGSEGV in Kryo / C2 CompilerThread on Java 
17.
  2.  FLINK-3154 - Update Kryo version from 2.24.0 to latest Kryo LTS 
version

My question is whether it is possible to release a major version (Flink 2.0.0) 
using the latest Kryo version for those who don't need to restore old 
savepoints/checkpoints in newer format.

  1.  Leverage JDK 17 features within JVM
  2.  Moving from the old format to the newer one will be handled only once - a 
mitigation can be achieved by a conversion tool or external serializers, both 
can be provided later on.

I'd like to emphasize that the next JDK LTS (21) will be released this 
September.  furthermore, Flink already supports JDK 12-15, which is very close 
to JDK 17 (LTS) - that was released in September 2021.  JDK 11 will become a 
legacy soon, as more frameworks moving towards JDK 17 and are less likely to 
support JDK 11 in the near future. (For example, Spring Boot 3 requires JDK 17 
already).

Thank you for your consideration of my request.

Tamir.




Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


[ANNOUNCE] Apache Flink 1.15.4 released

2023-03-15 Thread Danny Cranmer
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.4, which is the fourth bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/2023/03/15/apache-flink-1.15.4-release-announcement/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352526

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Feel free to reach out to the release managers (or respond to this thread)
with feedback on the release process. Our goal is to constantly improve the
release process. Feedback on what could be improved or things that didn't
go so well are appreciated.

Regards,
Danny


[jira] [Created] (FLINK-31476) AdaptiveScheduler should take lower bound parallelism settings into account

2023-03-15 Thread Jira
David Morávek created FLINK-31476:
-

 Summary: AdaptiveScheduler should take lower bound parallelism 
settings into account
 Key: FLINK-31476
 URL: https://issues.apache.org/jira/browse/FLINK-31476
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-parent 1.0.0, release candidate #1

2023-03-15 Thread Chesnay Schepler

Title correction: the version is 1.0.0.

On 15/03/2023 16:40, Chesnay Schepler wrote:

Hi everyone,
Please review and vote on the release candidate, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

This is the first release of the flink-connector-parent pom by the 
Flink project. This subsumes the previous release that I made myself.


A few minor changes have been made; see the release notes.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org 
[2], which are signed with the key with fingerprint 11D464BA [3],

* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


Thanks,
Chesnay

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352762
[2] 
https://dist.apache.org/repos/dist/dev/flink/flink-connector-parent-1.0.0-rc1/

[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] 
https://repository.apache.org/content/repositories/orgapacheflink-1597
[5] 
https://github.com/apache/flink-connector-shared-utils/releases/tag/v1.0.0-rc1

[6] https://github.com/apache/flink-web/pull/620






[VOTE] Release flink-connector-parent10.0, release candidate #1

2023-03-15 Thread Chesnay Schepler

Hi everyone,
Please review and vote on the release candidate, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

This is the first release of the flink-connector-parent pom by the Flink 
project. This subsumes the previous release that I made myself.


A few minor changes have been made; see the release notes.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org 
[2], which are signed with the key with fingerprint 11D464BA [3],

* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


Thanks,
Chesnay

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352762
[2] 
https://dist.apache.org/repos/dist/dev/flink/flink-connector-parent-1.0.0-rc1/

[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1597
[5] 
https://github.com/apache/flink-connector-shared-utils/releases/tag/v1.0.0-rc1

[6] https://github.com/apache/flink-web/pull/620




Re: [VOTE] FLIP-293: Introduce Flink Jdbc Driver For Sql Gateway

2023-03-15 Thread Samrat Deb
+1 ( non binding)

Bests,
Samrat

On Wed, 15 Mar 2023 at 8:54 PM, Dong Lin  wrote:

> +1 (binding)
>
> On Mon, Mar 13, 2023 at 1:47 PM Shammon FY  wrote:
>
> > Hi Devs,
> >
> > I'd like to start the vote on FLIP-293: Introduce Flink Jdbc Driver For
> Sql
> > Gateway [1].
> >
> > The FLIP was discussed in thread [2], and it aims to introduce Flink Jdbc
> > Driver module in Flink.
> >
> > The vote will last for at least 72 hours (03/16, 15:00 UTC+8) unless
> there
> > is an objection or insufficient vote. Thank you all.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway
> > [2] https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
> >
> >
> > Best,
> > Shammon.FY
> >
>


Re: [VOTE] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-15 Thread Dong Lin
+1 (binding)

On Mon, Mar 13, 2023 at 8:18 PM Lincoln Lee  wrote:

> Dear Flink developers,
>
> Thanks for all your feedback for FLIP-300: Add targetColumns to
> DynamicTableSink#Context to solve the null overwrite problem of
> partial-insert[1] on the discussion thread[2].
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours unless there is an objection or not enough votes.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> [2] https://lists.apache.org/thread/bk8x0nqg4oc62jqryj9ntzzlpj062wd9
>
>
> Best,
> Lincoln Lee
>


Re: [VOTE] FLIP-293: Introduce Flink Jdbc Driver For Sql Gateway

2023-03-15 Thread Dong Lin
+1 (binding)

On Mon, Mar 13, 2023 at 1:47 PM Shammon FY  wrote:

> Hi Devs,
>
> I'd like to start the vote on FLIP-293: Introduce Flink Jdbc Driver For Sql
> Gateway [1].
>
> The FLIP was discussed in thread [2], and it aims to introduce Flink Jdbc
> Driver module in Flink.
>
> The vote will last for at least 72 hours (03/16, 15:00 UTC+8) unless there
> is an objection or insufficient vote. Thank you all.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway
> [2] https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
>
>
> Best,
> Shammon.FY
>


[jira] [Created] (FLINK-31475) Allow project to be user-defined in release scripts

2023-03-15 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-31475:


 Summary: Allow project to be user-defined in release scripts
 Key: FLINK-31475
 URL: https://issues.apache.org/jira/browse/FLINK-31475
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common, Release System
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


The connector release scripts derive the project name from the repository.
For some esoteric cases (like the flink-connector-shared-utils repo) it would 
be beneficial to be able to override this on the command-line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31474) [Flink] Add failure information for out-of-order checkpoints

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31474:
---

 Summary: [Flink] Add failure information for out-of-order 
checkpoints
 Key: FLINK-31474
 URL: https://issues.apache.org/jira/browse/FLINK-31474
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Ming Li


At present, when the checkpoint is out of order, only out-of-order logs will be 
printed on the {{Task}} side, while on the {{JM}} side, the checkpoint can only 
fail through timeout, and the real reason cannot be confirmed.

Therefore, I think we should add failure information on the JM side for the 
out-of-order checkpoint.
{code:java}
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info(
"Out of order checkpoint barrier (aborted previously?): {} >= {}",
lastCheckpointId,
metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new 
CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31473) Add new show operations docs

2023-03-15 Thread Ran Tao (Jira)
Ran Tao created FLINK-31473:
---

 Summary: Add new show operations docs
 Key: FLINK-31473
 URL: https://issues.apache.org/jira/browse/FLINK-31473
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Ran Tao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-15 Thread Yun Tang
Hi Yuxin,

Thanks for your explanations.
I think the kernel idea is that you prefer the simple and decoupling design for 
the 1st version of hybrid shuffle with remote storage. If following this idea, 
perhaps I could accept your current explanations and I think you could describe 
it explicitly in the original FLIP's goals or design principles.


Best
Yun Tang

From: Yuxin Tan 
Sent: Wednesday, March 15, 2023 12:41
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

Hi, Yun,

Thanks for sharing the ideas.

> 1. We should trigger to kick the shuffle data to remote storage
once either condition is reached

I believe that configuring two options in this manner is a pragmatic
approach that can fulfill a wider range of usage scenarios. However,
if we present two options, it may become difficult to remove them
in the future once users have started relying on them. On the other
hand, if we introduce a single option, we can easily incorporate
additional options based on your recommendations if required.
Thus, we recommend adopting a one-option solution in the first
version to address the issue.

> 2. Perhaps we could switch to kick shuffle data to remote storage
once no space left exception is met.

Thanks for your valuable feedback. While the suggestion is a viable
solution to address the no space left exception issue, we are
concerned that implementing it could create interdependence between
the disk tier and remote storage tier, which would contradict our goal
of achieving independence between tiers in the new architecture.
Moreover, we believe that it is better to prevent encountering the
exception in the first place by reserving adequate disk space. This is
because other processes on the same machine may also be impacted
when the exception occurs. If the exception does still arise, we can
explore other potential solutions through detailed design and
discussions, such as the one you proposed, optimizing the reserved
space with a global counter of TM, etc. Although the current implementation
only partially addresses the exception issue, we expect to improve it
in subsequent versions due to the complexity of this FLIP. We would
appreciate hearing your thoughts on this matter.

Best,
Yuxin


Yun Tang  于2023年3月14日周二 14:48写道:

> Hi Yuxin
>
> Thanks for your reply.
> I am not saying that we should use an absolute reserved value to replace
> the current plan of the reserved fraction. We should trigger to kick the
> shuffle data to remote storage once either condition is reached.
> Maybe we could get some idea from the configuration of tiered based state
> backend [1].
>
> The concern about concurrent writing is due to the local disk being shared
> by all the instances running on that node, we cannot ensure other
> components would not flush data during shuffle writing. Perhaps we could
> switch to kick shuffle data to remote storage once no space left exception
> is met.
>
>
> [1]
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
>
>
> Best
> Yun Tang
>
> 
> From: Yuxin Tan 
> Sent: Monday, March 13, 2023 15:06
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi,
> Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
>
> @Zhu Zhu
> > 1. I'm a bit concerned about the cost of it.
>
> The Read Client's request to check the existence of a segment in each
> storage tier is the netty message, which is similar to the credit updating
> messages. It is observed that the netty message cost for these operations
> is relatively low and the number of messages is also relatively low
> compared to credit updates which are sent every few buffers. Moreover,
> since this request involves only memory operations, the message cost
> is significantly low during the total reading data process.
>
> And we will further optimize the message cost later, particularly for
> segments that remain in the same tier without switching tiers. That
> is, for consecutive segments in the same tier, we can continue to send
> the next segment without waiting for the downstream to ask whether the
> segment exists in this tier.
>
>
> @Xia Sun
> > 1. how to keep the order of segments between different storage tiers
>
> To indicate the sequential order of upstream segments, we rely on segmentId
> downstream. Once segment n has been fully consumed by the downstream,
> the subsequent segment n + 1 will be asked in its natural order. As for the
> order of the buffers within each segment, they follow the default ordering
> mechanisms of Flink.
>
> > 2. what will happen if the disk space is found to be full?
>
> By introducing a way of reserving some space in advance, we will try to
> avoid this situation of reporting the disk space error as much as possible.
> The size of reserved space can be co

[jira] [Created] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2023-03-15 Thread Ran Tao (Jira)
Ran Tao created FLINK-31472:
---

 Summary: AsyncSinkWriterThrottlingTest failed with Illegal mailbox 
thread
 Key: FLINK-31472
 URL: https://issues.apache.org/jira/browse/FLINK-31472
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.1, 1.17.0
Reporter: Ran Tao


when run mvn clean test, this case failed occasionally.

[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 s 
<<< FAILURE! - in 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
[ERROR] 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
  Time elapsed: 0.492 s  <<< ERROR!
java.lang.IllegalStateException: Illegal thread detected. This method must be 
called from inside the mailbox thread!
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
        at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
        at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
        at 
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
        at 
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
        at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
        at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
        at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
        at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
        at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execut

[DISCUSSI] Improve Multi-Query optimization in Flink

2023-03-15 Thread Aitozi
Hi devs,

I want to discuss about a new way to optimize the multi-sink query in Flink.

Currently, Flink use the RelNodeBlock mechanism to do optimization for the
mutl-sink query.

It has following steps:


   1. Multi-sink query will be parsed and validated to multi RelNode tree
   2. Merge the multi RelNode's common node into a single node if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled
   3. Split the multi RelNode tree to multi RelNodeBlock.
   4. Feed the each RelNodeBlock to Calcite Planner to do the optimization
   5. Reconstruct back to the original structure with optimized RelNodeBlock

As far as I know (Please correct me if I'm wrong). The main purpose of
RelNodeBlock is doing the following two thing:


   - Calcite not support DAG optimization, So the RelNodeBlock can split
   the multi-tree to much single tree, then we can leverage calcite to do the
   optimization
   - In the Multi-sink query, we need to reduce the repeat calculation of
   the same node. So, if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled, we
   can preserve the common node from being optimized to different results and
   lead to the repeat calculation

However, In our production, We found the ability of the RelNodeBlock
optimization is not enough. As shown in CommonSubGraphBasedOptimizer's
comments: The optimization of the RelNodeBlock is local optimization. There
are no optimization way between the RelNodeBlock. Take a simple example:

SinkSink

| |

Project(a,b)Project (a,b,c)

| |

Scan (a,b,c,d,e)  Scan (a,b,c,d,e)

It scan from the same table, In the current optimization, we can only
choose whether or not merge the Scan to a RelNodeBlock before optimization.

If merged, the Scan can not leverage the optimization of ProjectPushDown
and so on.

If not merged, during the optimization, two RelNodeBlock will generated two
different scan {a, b} and {a,b,c}.

So I'm proposing a new way to improve the CTE optimization of the
multi-query (or single query).


   1. Insert a VirtualMultiSink to pack the sink nodes together. described
   in [2]. Which is inspired by the [3]
   2. Insert a new Spool node (which is means produced once, consumed multi
   times) to the RelNode who has multi output.
   3. Implementing several rules around the Spool node


   1. PushProjectToSpool to pass away the unused the fields from all the
  Spool node's parents
  2. PushFilterToSpool to push down the DNF conditions of all the Spool
  node's parents
  3. ...


   1. Further more, we can implement the rule to discard the Spool, then
   let the Planner to decide whether to reuse or not based on the cost of each.
   2. After the physcial rewrite, we can remove the
   PhysicalVirtualMultiSink and the Spool node.

The benefits of the new way is:

   1. It can do the optimization in a single tree, So the local
   optimization can be avoid
   2. The cost based CTE optimization is available in the new solution.
   3. The new solution can optimize for the CTE in both multi-query and
   single-query, So the problem of [1] can also be resolved
   4. Avoid the trait propagate between the RelNodeBlocks in the current
   solution. as described in [4]

Looking forward to your inputs.

Best,

Aitozi.

[1]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-29088

[2]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31205

[3]: Efficient and Extensible Algorithms for Multi Query Optimization
https://arxiv.org/pdf/cs/9910021.pdf

[4]: https://issues.apache.org/jira/browse/FLINK-24048


[jira] [Created] (FLINK-31470) Introduce integration tests for Externalized Declarative Resource Management

2023-03-15 Thread Jira
David Morávek created FLINK-31470:
-

 Summary: Introduce integration tests for Externalized Declarative 
Resource Management
 Key: FLINK-31470
 URL: https://issues.apache.org/jira/browse/FLINK-31470
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31471) Allow changing per-vertex desired parallelism through WEB UI

2023-03-15 Thread Jira
David Morávek created FLINK-31471:
-

 Summary: Allow changing per-vertex desired parallelism through WEB 
UI
 Key: FLINK-31471
 URL: https://issues.apache.org/jira/browse/FLINK-31471
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31469) Allow setting JobResourceRequirements through REST API

2023-03-15 Thread Jira
David Morávek created FLINK-31469:
-

 Summary: Allow setting JobResourceRequirements through REST API
 Key: FLINK-31469
 URL: https://issues.apache.org/jira/browse/FLINK-31469
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / REST
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31468) Allow setting JobResourceRequirements through DispatcherGateway

2023-03-15 Thread Jira
David Morávek created FLINK-31468:
-

 Summary: Allow setting JobResourceRequirements through 
DispatcherGateway
 Key: FLINK-31468
 URL: https://issues.apache.org/jira/browse/FLINK-31468
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31467) Support time travel for Spark 3.3

2023-03-15 Thread yuzelin (Jira)
yuzelin created FLINK-31467:
---

 Summary: Support time travel for Spark 3.3
 Key: FLINK-31467
 URL: https://issues.apache.org/jira/browse/FLINK-31467
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin


Support Spark 3.3 `VERSION AS OF` and `TIMESTAMP AS OF` Syntax



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Leonard Xu
Thanks Hongshun for picking up the ticket and analyzing the detail deeply.

As you have discussed with Qingsheng offline, I think we can update the content 
to FLIP-288[1] and then start the FLIP discussion.


Best,
Leonard
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source

> On Mar 15, 2023, at 5:25 PM, Hongshun Wang  wrote:
> 
> Thanks for your advise! I will do this later.
> 
> 
> Best, Hongshun
> 
> On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot 
> wrote:
> 
>> Hi,
>> 
>> Why not track this in a FLIP and a ticket and link this discussion thread.
>> 
>> My 2 cents
>> 
>> Etienne
>> 
>> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
>>>  Hi devs,
>>> I’d like to join this discussion. CC:Qingsheng
>>> As discussed above, new partitions after the first discovery should be
>>> consumed from EARLIEST offset.
>>> 
>>> However, when KafkaSourceEnumerator restarts after a job failure, it
>> cannot
>>> distinguish between unassigned partitions as first-discovered or new,
>>> because the snapshot state currently only contains assignedPartitions
>>> collection (the assigned partitions). We can solve this by adding a
>>> unAssignedInitialPartitons collection to snapshot state, which represents
>>> the collection of first discovered partitions that have not yet been
>>> assigned. Also, we can combine this two collections into a single
>>> collection if we add status to each item.
>>> 
>>> Besides , there is also a problem which often occurs in pattern mode to
>>> distinguish between the following two case:
>>> 
>>>1. Case1:  The first partition discovery is too slow, before which
>> the
>>>checkpoint is finished and then job is restarted .At this time, the
>>>restored unAssignedInitialPartitons is an empty collection, which
>> means
>>>non-discovery. The next discovery will be treated as the first
>> discovery.
>>>2. Case2:  The first time the partition is obtained is empty, and new
>>>partitions can only be obtained after multiple partition
>> discoveries. If a
>>>restart occurs between this period, the restored
>>>*unAssignedInitialPartitons* is also an empty collection, which means
>>>empty-discovery. However, the next discovery should be treated as a
>> new
>>>discovery.
>>> 
>>> We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
>>> to snapshot state, which represents whether the first-discovery has been
>>> done.
>>> 
>>> Also two rejected alternatives :
>>> 
>>>1. Change the KafkaSourceEnumerator's snapshotState method to a
>> blocking
>>>one, which resumes only after the first-discovered partition has been
>>>successfully assigned to KafkaSourceReader. The advantage of this
>> approach
>>>is no need to change the snapshot state's variable values. However,
>> if
>>>first-discovered partitions are not assigned before checkpointing,
>> the
>>>SourceCoordinator's event-loop thread will be blocked, but partition
>>>assignment also requires the event-loop thread to execute, which
>> will cause
>>>thread self-locking.
>>>2. An alternative to the *firstDiscoveryDone* variable. If we change
>> the
>>>first discovery method to a synchronous method, we can ensure that
>> Case1
>>>will never happen. Because when the event-loop thread starts, it
>> first adds
>>>a discovery event to the blocking queue. When it turns to execute the
>>>checkpoint event, the partition has already been discovered
>> successfully.
>>>However, If partition discovery is a heavily time-consuming
>> operation, the
>>>SourceCoordinator cannot process other event operations during the
>> waiting
>>>period, such as reader registration. It is a waste.
>>> 
>>> Best regards,
>>> Hongshun
>>> 
>>> On 2023/01/13 03:31:20 Qingsheng Ren wrote:
 Hi devs,
 
 I’d like to start a discussion about enabling the dynamic partition
 discovery feature by default in Kafka source. Dynamic partition
>> discovery
 [1] is a useful feature in Kafka source especially under the scenario
>> when
 the consuming Kafka topic scales out, or the source subscribes to
>> multiple
 Kafka topics with a pattern. Users don’t have to restart the Flink job
>> to
 consume messages in the new partition with this feature enabled.
>>> Currently,
 dynamic partition discovery is disabled by default and users have to
 explicitly specify the interval of discovery in order to turn it on.
 
 # Breaking changes
 
 For Kafka table source:
 
 - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
 default.
 - As we need to provide a way for users to disable the feature,
 “scan.topic-partition-discovery.interval” = “0” will be used to turn off
 the discovery. Before this proposal, “0” means to enable partition
 discovery with interval = 0, which is a bit senseless in practice.
 Unfortunately 

[jira] [Created] (FLINK-31466) Backport "FilterJoinRule misses opportunity to push filter to semijoin input" to FlinkFilterJoinRule

2023-03-15 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-31466:
---

 Summary: Backport "FilterJoinRule misses opportunity to push 
filter to semijoin input" to FlinkFilterJoinRule
 Key: FLINK-31466
 URL: https://issues.apache.org/jira/browse/FLINK-31466
 Project: Flink
  Issue Type: Improvement
Reporter: Sergey Nuyanzin


In https://issues.apache.org/jira/browse/CALCITE-4499 there has been done an 
optimization.
Since Flink has it's own copy of slightly changed {{FilterJoiRule}} this 
optimization does not come with 1.28 update.

The idea is to apply this change to {{FlinkFilterJoinRule}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31465) [Flink] Fix shortcode errors in docs

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31465:
---

 Summary: [Flink] Fix shortcode errors in docs
 Key: FLINK-31465
 URL: https://issues.apache.org/jira/browse/FLINK-31465
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


When running docs with hugo, I get the following exception:
{code:java}
hugo v0.111.3+extended darwin/amd64 BuildDate=unknown
Error: Error building site: 
"/xxx/flink-table-store/docs/content/docs/how-to/writing-tables.md:303:1": 
failed to extract shortcode: shortcode "tabs" must be closed or 
self-closed{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-296: Extend watermark-related features for SQL

2023-03-15 Thread Yun Tang
+1 (binding)

Thanks Kui for driving this work.


Best
Yun Tang

From: Kui Yuan 
Sent: Wednesday, March 15, 2023 16:45
To: dev@flink.apache.org 
Subject: [VOTE] FLIP-296: Extend watermark-related features for SQL

Hi all,

I want to start the vote of FLIP-296: Extend watermark-related features for
SQL [1].The FLIP was discussed in this thread [2].  The goal of the FLIP is
to extend those watermark-related features already implemented on the
datastream api for SQL users.

The vote will last for at least 72 hours unless there is an objection or
insufficient votes. Thank you all. [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL
[2] https://lists.apache.org/thread/d681bx4t935c30zl750gy6d41tfypbph
  Best,
Kui Yuan


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Shammon FY
Hi Hongshun

Agree with @Etienne. And can you describe this process and the problems in
a figure? Thanks

Best,
Shammon FY

On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot 
wrote:

> Hi,
>
> Why not track this in a FLIP and a ticket and link this discussion thread.
>
> My 2 cents
>
> Etienne
>
> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
> >   Hi devs,
> > I’d like to join this discussion. CC:Qingsheng
> > As discussed above, new partitions after the first discovery should be
> > consumed from EARLIEST offset.
> >
> > However, when KafkaSourceEnumerator restarts after a job failure, it
> cannot
> > distinguish between unassigned partitions as first-discovered or new,
> > because the snapshot state currently only contains assignedPartitions
> > collection (the assigned partitions). We can solve this by adding a
> > unAssignedInitialPartitons collection to snapshot state, which represents
> > the collection of first discovered partitions that have not yet been
> > assigned. Also, we can combine this two collections into a single
> > collection if we add status to each item.
> >
> > Besides , there is also a problem which often occurs in pattern mode to
> > distinguish between the following two case:
> >
> > 1. Case1:  The first partition discovery is too slow, before which
> the
> > checkpoint is finished and then job is restarted .At this time, the
> > restored unAssignedInitialPartitons is an empty collection, which
> means
> > non-discovery. The next discovery will be treated as the first
> discovery.
> > 2. Case2:  The first time the partition is obtained is empty, and new
> > partitions can only be obtained after multiple partition
> discoveries. If a
> > restart occurs between this period, the restored
> > *unAssignedInitialPartitons* is also an empty collection, which means
> > empty-discovery. However, the next discovery should be treated as a
> new
> > discovery.
> >
> > We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
> > to snapshot state, which represents whether the first-discovery has been
> > done.
> >
> > Also two rejected alternatives :
> >
> > 1. Change the KafkaSourceEnumerator's snapshotState method to a
> blocking
> > one, which resumes only after the first-discovered partition has been
> > successfully assigned to KafkaSourceReader. The advantage of this
> approach
> > is no need to change the snapshot state's variable values. However,
> if
> > first-discovered partitions are not assigned before checkpointing,
> the
> > SourceCoordinator's event-loop thread will be blocked, but partition
> > assignment also requires the event-loop thread to execute, which
> will cause
> > thread self-locking.
> > 2. An alternative to the *firstDiscoveryDone* variable. If we change
> the
> > first discovery method to a synchronous method, we can ensure that
> Case1
> > will never happen. Because when the event-loop thread starts, it
> first adds
> > a discovery event to the blocking queue. When it turns to execute the
> > checkpoint event, the partition has already been discovered
> successfully.
> > However, If partition discovery is a heavily time-consuming
> operation, the
> > SourceCoordinator cannot process other event operations during the
> waiting
> > period, such as reader registration. It is a waste.
> >
> > Best regards,
> > Hongshun
> >
> > On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> >> Hi devs,
> >>
> >> I’d like to start a discussion about enabling the dynamic partition
> >> discovery feature by default in Kafka source. Dynamic partition
> discovery
> >> [1] is a useful feature in Kafka source especially under the scenario
> when
> >> the consuming Kafka topic scales out, or the source subscribes to
> multiple
> >> Kafka topics with a pattern. Users don’t have to restart the Flink job
> to
> >> consume messages in the new partition with this feature enabled.
> > Currently,
> >> dynamic partition discovery is disabled by default and users have to
> >> explicitly specify the interval of discovery in order to turn it on.
> >>
> >> # Breaking changes
> >>
> >> For Kafka table source:
> >>
> >> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> >> default.
> >> - As we need to provide a way for users to disable the feature,
> >> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> >> the discovery. Before this proposal, “0” means to enable partition
> >> discovery with interval = 0, which is a bit senseless in practice.
> >> Unfortunately we can't use negative values as the type of this option is
> >> Duration.
> >>
> >> For KafkaSource (DataStream API)
> >>
> >> - Dynamic partition discovery in Kafka source will be enabled by
> default,
> >> with discovery interval set to 30 seconds.
> >> - To align with table source, only a positive value for option “
> >> partition.discovery.interval.ms” could be used to specify t

Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Hongshun Wang
Thanks for your advise! I will do this later.


Best, Hongshun

On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot 
wrote:

> Hi,
>
> Why not track this in a FLIP and a ticket and link this discussion thread.
>
> My 2 cents
>
> Etienne
>
> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
> >   Hi devs,
> > I’d like to join this discussion. CC:Qingsheng
> > As discussed above, new partitions after the first discovery should be
> > consumed from EARLIEST offset.
> >
> > However, when KafkaSourceEnumerator restarts after a job failure, it
> cannot
> > distinguish between unassigned partitions as first-discovered or new,
> > because the snapshot state currently only contains assignedPartitions
> > collection (the assigned partitions). We can solve this by adding a
> > unAssignedInitialPartitons collection to snapshot state, which represents
> > the collection of first discovered partitions that have not yet been
> > assigned. Also, we can combine this two collections into a single
> > collection if we add status to each item.
> >
> > Besides , there is also a problem which often occurs in pattern mode to
> > distinguish between the following two case:
> >
> > 1. Case1:  The first partition discovery is too slow, before which
> the
> > checkpoint is finished and then job is restarted .At this time, the
> > restored unAssignedInitialPartitons is an empty collection, which
> means
> > non-discovery. The next discovery will be treated as the first
> discovery.
> > 2. Case2:  The first time the partition is obtained is empty, and new
> > partitions can only be obtained after multiple partition
> discoveries. If a
> > restart occurs between this period, the restored
> > *unAssignedInitialPartitons* is also an empty collection, which means
> > empty-discovery. However, the next discovery should be treated as a
> new
> > discovery.
> >
> > We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
> > to snapshot state, which represents whether the first-discovery has been
> > done.
> >
> > Also two rejected alternatives :
> >
> > 1. Change the KafkaSourceEnumerator's snapshotState method to a
> blocking
> > one, which resumes only after the first-discovered partition has been
> > successfully assigned to KafkaSourceReader. The advantage of this
> approach
> > is no need to change the snapshot state's variable values. However,
> if
> > first-discovered partitions are not assigned before checkpointing,
> the
> > SourceCoordinator's event-loop thread will be blocked, but partition
> > assignment also requires the event-loop thread to execute, which
> will cause
> > thread self-locking.
> > 2. An alternative to the *firstDiscoveryDone* variable. If we change
> the
> > first discovery method to a synchronous method, we can ensure that
> Case1
> > will never happen. Because when the event-loop thread starts, it
> first adds
> > a discovery event to the blocking queue. When it turns to execute the
> > checkpoint event, the partition has already been discovered
> successfully.
> > However, If partition discovery is a heavily time-consuming
> operation, the
> > SourceCoordinator cannot process other event operations during the
> waiting
> > period, such as reader registration. It is a waste.
> >
> > Best regards,
> > Hongshun
> >
> > On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> >> Hi devs,
> >>
> >> I’d like to start a discussion about enabling the dynamic partition
> >> discovery feature by default in Kafka source. Dynamic partition
> discovery
> >> [1] is a useful feature in Kafka source especially under the scenario
> when
> >> the consuming Kafka topic scales out, or the source subscribes to
> multiple
> >> Kafka topics with a pattern. Users don’t have to restart the Flink job
> to
> >> consume messages in the new partition with this feature enabled.
> > Currently,
> >> dynamic partition discovery is disabled by default and users have to
> >> explicitly specify the interval of discovery in order to turn it on.
> >>
> >> # Breaking changes
> >>
> >> For Kafka table source:
> >>
> >> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> >> default.
> >> - As we need to provide a way for users to disable the feature,
> >> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> >> the discovery. Before this proposal, “0” means to enable partition
> >> discovery with interval = 0, which is a bit senseless in practice.
> >> Unfortunately we can't use negative values as the type of this option is
> >> Duration.
> >>
> >> For KafkaSource (DataStream API)
> >>
> >> - Dynamic partition discovery in Kafka source will be enabled by
> default,
> >> with discovery interval set to 30 seconds.
> >> - To align with table source, only a positive value for option “
> >> partition.discovery.interval.ms” could be used to specify the discovery
> >> interval. Both negative and zero will be i

[jira] [Created] (FLINK-31464) Move SqlNode convert logic out from SqlToOperationConverter

2023-03-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-31464:


 Summary: Move SqlNode convert logic out from 
SqlToOperationConverter
 Key: FLINK-31464
 URL: https://issues.apache.org/jira/browse/FLINK-31464
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: luoyuxia


Similar to FLINK-31368, the `SqlToOperationConverter` is a bit bloated. We can 
refactor it to avoid the code length for this class grow quickly.

We can follow the idea proposed in FLINK-31368.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Etienne Chauchot

Hi,

Why not track this in a FLIP and a ticket and link this discussion thread.

My 2 cents

Etienne

Le 15/03/2023 à 10:01, Hongshun Wang a écrit :

  Hi devs,
I’d like to join this discussion. CC:Qingsheng
As discussed above, new partitions after the first discovery should be
consumed from EARLIEST offset.

However, when KafkaSourceEnumerator restarts after a job failure, it cannot
distinguish between unassigned partitions as first-discovered or new,
because the snapshot state currently only contains assignedPartitions
collection (the assigned partitions). We can solve this by adding a
unAssignedInitialPartitons collection to snapshot state, which represents
the collection of first discovered partitions that have not yet been
assigned. Also, we can combine this two collections into a single
collection if we add status to each item.

Besides , there is also a problem which often occurs in pattern mode to
distinguish between the following two case:

1. Case1:  The first partition discovery is too slow, before which the
checkpoint is finished and then job is restarted .At this time, the
restored unAssignedInitialPartitons is an empty collection, which means
non-discovery. The next discovery will be treated as the first discovery.
2. Case2:  The first time the partition is obtained is empty, and new
partitions can only be obtained after multiple partition discoveries. If a
restart occurs between this period, the restored
*unAssignedInitialPartitons* is also an empty collection, which means
empty-discovery. However, the next discovery should be treated as a new
discovery.

We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
to snapshot state, which represents whether the first-discovery has been
done.

Also two rejected alternatives :

1. Change the KafkaSourceEnumerator's snapshotState method to a blocking
one, which resumes only after the first-discovered partition has been
successfully assigned to KafkaSourceReader. The advantage of this approach
is no need to change the snapshot state's variable values. However, if
first-discovered partitions are not assigned before checkpointing, the
SourceCoordinator's event-loop thread will be blocked, but partition
assignment also requires the event-loop thread to execute, which will cause
thread self-locking.
2. An alternative to the *firstDiscoveryDone* variable. If we change the
first discovery method to a synchronous method, we can ensure that Case1
will never happen. Because when the event-loop thread starts, it first adds
a discovery event to the blocking queue. When it turns to execute the
checkpoint event, the partition has already been discovered successfully.
However, If partition discovery is a heavily time-consuming operation, the
SourceCoordinator cannot process other event operations during the waiting
period, such as reader registration. It is a waste.

Best regards,
Hongshun

On 2023/01/13 03:31:20 Qingsheng Ren wrote:

Hi devs,

I’d like to start a discussion about enabling the dynamic partition
discovery feature by default in Kafka source. Dynamic partition discovery
[1] is a useful feature in Kafka source especially under the scenario when
the consuming Kafka topic scales out, or the source subscribes to multiple
Kafka topics with a pattern. Users don’t have to restart the Flink job to
consume messages in the new partition with this feature enabled.

Currently,

dynamic partition discovery is disabled by default and users have to
explicitly specify the interval of discovery in order to turn it on.

# Breaking changes

For Kafka table source:

- “scan.topic-partition-discovery.interval” will be set to 30 seconds by
default.
- As we need to provide a way for users to disable the feature,
“scan.topic-partition-discovery.interval” = “0” will be used to turn off
the discovery. Before this proposal, “0” means to enable partition
discovery with interval = 0, which is a bit senseless in practice.
Unfortunately we can't use negative values as the type of this option is
Duration.

For KafkaSource (DataStream API)

- Dynamic partition discovery in Kafka source will be enabled by default,
with discovery interval set to 30 seconds.
- To align with table source, only a positive value for option “
partition.discovery.interval.ms” could be used to specify the discovery
interval. Both negative and zero will be interpreted as disabling the
feature.

# Overhead of partition discovery

Partition discovery is made on KafkaSourceEnumerator, which asynchronously
fetches topic metadata from Kafka cluster and checks if there’s any new
topic and partition. This shouldn’t introduce performance issues on the
Flink side.

On the Kafka broker side, partition discovery makes MetadataRequest to
Kafka broker for fetching topic infos. Considering Kafka broker has its
metadata cache and the default request frequency is relatively low (per 30
seconds), this is

Re: [VOTE] FLIP-293: Introduce Flink Jdbc Driver For Sql Gateway

2023-03-15 Thread Hang Ruan
+1 (non-binding)

Best,
Hang

Leonard Xu  于2023年3月15日周三 15:53写道:

> Thanks shammon for driving this FLIP.
>
> +1 (binding)
>
> Best,
> Leonard
>
> On Wed, Mar 15, 2023 at 1:36 PM weijie guo 
> wrote:
>
> > +1 (binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Jacky Lau  于2023年3月15日周三 13:30写道:
> >
> > > +1 (non-binding)
> > > it will make integrate with other tools convenient, like
> beeline/tableau
> > >
> > > Best Regards,
> > > Jacky Lau
> > >
> > > Ran Tao  于2023年3月15日周三 13:11写道:
> > >
> > > > It's a very nice improvement.
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Best Regards,
> > > > Ran Tao
> > > >
> > > >
> > > > Shammon FY  于2023年3月13日周一 13:47写道:
> > > >
> > > > > Hi Devs,
> > > > >
> > > > > I'd like to start the vote on FLIP-293: Introduce Flink Jdbc Driver
> > For
> > > > Sql
> > > > > Gateway [1].
> > > > >
> > > > > The FLIP was discussed in thread [2], and it aims to introduce
> Flink
> > > Jdbc
> > > > > Driver module in Flink.
> > > > >
> > > > > The vote will last for at least 72 hours (03/16, 15:00 UTC+8)
> unless
> > > > there
> > > > > is an objection or insufficient vote. Thank you all.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway
> > > > > [2]
> https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
> > > > >
> > > > >
> > > > > Best,
> > > > > Shammon.FY
> > > > >
> > > >
> > >
> >
>


RE: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Hongshun Wang
 Hi devs,
I’d like to join this discussion. CC:Qingsheng
As discussed above, new partitions after the first discovery should be
consumed from EARLIEST offset.

However, when KafkaSourceEnumerator restarts after a job failure, it cannot
distinguish between unassigned partitions as first-discovered or new,
because the snapshot state currently only contains assignedPartitions
collection (the assigned partitions). We can solve this by adding a
unAssignedInitialPartitons collection to snapshot state, which represents
the collection of first discovered partitions that have not yet been
assigned. Also, we can combine this two collections into a single
collection if we add status to each item.

Besides , there is also a problem which often occurs in pattern mode to
distinguish between the following two case:

   1. Case1:  The first partition discovery is too slow, before which the
   checkpoint is finished and then job is restarted .At this time, the
   restored unAssignedInitialPartitons is an empty collection, which means
   non-discovery. The next discovery will be treated as the first discovery.
   2. Case2:  The first time the partition is obtained is empty, and new
   partitions can only be obtained after multiple partition discoveries. If a
   restart occurs between this period, the restored
   *unAssignedInitialPartitons* is also an empty collection, which means
   empty-discovery. However, the next discovery should be treated as a new
   discovery.

We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
to snapshot state, which represents whether the first-discovery has been
done.

Also two rejected alternatives :

   1. Change the KafkaSourceEnumerator's snapshotState method to a blocking
   one, which resumes only after the first-discovered partition has been
   successfully assigned to KafkaSourceReader. The advantage of this approach
   is no need to change the snapshot state's variable values. However, if
   first-discovered partitions are not assigned before checkpointing, the
   SourceCoordinator's event-loop thread will be blocked, but partition
   assignment also requires the event-loop thread to execute, which will cause
   thread self-locking.
   2. An alternative to the *firstDiscoveryDone* variable. If we change the
   first discovery method to a synchronous method, we can ensure that Case1
   will never happen. Because when the event-loop thread starts, it first adds
   a discovery event to the blocking queue. When it turns to execute the
   checkpoint event, the partition has already been discovered successfully.
   However, If partition discovery is a heavily time-consuming operation, the
   SourceCoordinator cannot process other event operations during the waiting
   period, such as reader registration. It is a waste.

Best regards,
Hongshun

On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> Hi devs,
>
> I’d like to start a discussion about enabling the dynamic partition
> discovery feature by default in Kafka source. Dynamic partition discovery
> [1] is a useful feature in Kafka source especially under the scenario when
> the consuming Kafka topic scales out, or the source subscribes to multiple
> Kafka topics with a pattern. Users don’t have to restart the Flink job to
> consume messages in the new partition with this feature enabled.
Currently,
> dynamic partition discovery is disabled by default and users have to
> explicitly specify the interval of discovery in order to turn it on.
>
> # Breaking changes
>
> For Kafka table source:
>
> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> default.
> - As we need to provide a way for users to disable the feature,
> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> the discovery. Before this proposal, “0” means to enable partition
> discovery with interval = 0, which is a bit senseless in practice.
> Unfortunately we can't use negative values as the type of this option is
> Duration.
>
> For KafkaSource (DataStream API)
>
> - Dynamic partition discovery in Kafka source will be enabled by default,
> with discovery interval set to 30 seconds.
> - To align with table source, only a positive value for option “
> partition.discovery.interval.ms” could be used to specify the discovery
> interval. Both negative and zero will be interpreted as disabling the
> feature.
>
> # Overhead of partition discovery
>
> Partition discovery is made on KafkaSourceEnumerator, which asynchronously
> fetches topic metadata from Kafka cluster and checks if there’s any new
> topic and partition. This shouldn’t introduce performance issues on the
> Flink side.
>
> On the Kafka broker side, partition discovery makes MetadataRequest to
> Kafka broker for fetching topic infos. Considering Kafka broker has its
> metadata cache and the default request frequency is relatively low (per 30
> seconds), this is not a heavy operation and the performance of the broker
> won’t be affected a lot. It'll also be grea

[jira] [Created] (FLINK-31463) When I use apache flink1.12.2 version, the following akka error often occurs.

2023-03-15 Thread Zhuang Liu (Jira)
Zhuang Liu  created FLINK-31463:
---

 Summary: When I use apache flink1.12.2 version, the following akka 
error often occurs.
 Key: FLINK-31463
 URL: https://issues.apache.org/jira/browse/FLINK-31463
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.15.4
Reporter: Zhuang Liu 


When I use apache flink1.12.2 version, the following akka error often occurs.

java.util.concurrent.TimeoutException: Remote system has been silent for too
long. (more than 48.0 hours)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



I checked that 48 hours ago, there was indeed a process hang inside flink, and 
the flink job was restarted.How to deal with this? Is this a bug in akka or 
flink? Thank you !



 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-296: Extend watermark-related features for SQL

2023-03-15 Thread Kui Yuan
Hi all,

I want to start the vote of FLIP-296: Extend watermark-related features for
SQL [1].The FLIP was discussed in this thread [2].  The goal of the FLIP is
to extend those watermark-related features already implemented on the
datastream api for SQL users.

The vote will last for at least 72 hours unless there is an objection or
insufficient votes. Thank you all. [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL
[2] https://lists.apache.org/thread/d681bx4t935c30zl750gy6d41tfypbph
  Best,
Kui Yuan


[jira] [Created] (FLINK-31462) [Flink] Supports full calculation from the specified snapshot

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31462:
---

 Summary: [Flink] Supports full calculation from the specified 
snapshot
 Key: FLINK-31462
 URL: https://issues.apache.org/jira/browse/FLINK-31462
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


Currently, the table store provides a startup mode for incremental consumption 
from a specified snapshot. We can provide a startup mode for incremental 
consumption after full calculation from a specified snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-293: Introduce Flink Jdbc Driver For Sql Gateway

2023-03-15 Thread Leonard Xu
Thanks shammon for driving this FLIP.

+1 (binding)

Best,
Leonard

On Wed, Mar 15, 2023 at 1:36 PM weijie guo 
wrote:

> +1 (binding)
>
> Best regards,
>
> Weijie
>
>
> Jacky Lau  于2023年3月15日周三 13:30写道:
>
> > +1 (non-binding)
> > it will make integrate with other tools convenient, like beeline/tableau
> >
> > Best Regards,
> > Jacky Lau
> >
> > Ran Tao  于2023年3月15日周三 13:11写道:
> >
> > > It's a very nice improvement.
> > >
> > > +1 (non-binding)
> > >
> > > Best Regards,
> > > Ran Tao
> > >
> > >
> > > Shammon FY  于2023年3月13日周一 13:47写道:
> > >
> > > > Hi Devs,
> > > >
> > > > I'd like to start the vote on FLIP-293: Introduce Flink Jdbc Driver
> For
> > > Sql
> > > > Gateway [1].
> > > >
> > > > The FLIP was discussed in thread [2], and it aims to introduce Flink
> > Jdbc
> > > > Driver module in Flink.
> > > >
> > > > The vote will last for at least 72 hours (03/16, 15:00 UTC+8) unless
> > > there
> > > > is an objection or insufficient vote. Thank you all.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway
> > > > [2] https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
> > > >
> > > >
> > > > Best,
> > > > Shammon.FY
> > > >
> > >
> >
>


Re: [VOTE] Release 1.17.0, release candidate #2

2023-03-15 Thread Junrui Lee
Thanks Qingsheng for driving this.
+1 (non-binding)

- built from source code
- run some batch jobs, query results are expected, the default batch
scheduler is AdaptiveBatchScheduler, WebUI looks
good,  no suspicious output/log.
- start a Flink yarn session cluster, run the 10TB tpc-ds, all job can
finish without any exception by AdaptiveBatchScheduler, only configuring
parallelism.default enables operators whose parallelism is not specified by
the user to automatically derive the parallelism based on the amount of
data, without being limited by the power of 2.

Best,
Junrui

Etienne Chauchot  于2023年3月14日周二 19:00写道:

> Hi all,
>
> As promised, I ran the same tests on 1.17.0 RC2 I also verified the
> release notes.
>
> Based on the scope of these tests : +1 (non-binding)
>
> Etienne
>
> Le 14/03/2023 à 07:44, Qingsheng Ren a écrit :
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version
> > 1.17.0, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which includes:
> >
> > * JIRA release notes [1], and the pull request adding release note for
> > users [2]
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [3], which are signed with the key with
> > fingerprint A1BD477F79D036D2C30CA7DBCA8AEEC2F6EB040B [4],
> > * all artifacts to be deployed to the Maven Central Repository [5],
> > * source code tag "release-1.17.0-rc2" [6],
> > * website pull request listing the new release and adding announcement
> blog
> > post [7].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
> > [2] https://github.com/apache/flink/pull/22146
> > [3] https://dist.apache.org/repos/dist/dev/flink/flink-1.17.0-rc2/
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [5]
> https://repository.apache.org/content/repositories/orgapacheflink-1595
> > [6] https://github.com/apache/flink/releases/tag/release-1.17.0-rc2
> > [7] https://github.com/apache/flink-web/pull/618
> >
> > Thanks,
> > Martijn and Matthias, Leonard and Qingsheng
> >
>