Re: [Discuss] Conventions on driving FLIPs towards consensus

2023-02-06 Thread Xintong Song
The discussion has been open for quite some time and there seems not to be
more inputs.

Thanks all for the discussion. I have updated the Flink Improvement
Proposals wiki page [1] with the discussed conventions.

Best,

Xintong


[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals



On Tue, Jan 31, 2023 at 11:15 AM Xintong Song  wrote:

> Thanks all for the feedback.
>
>
> @Dong,
>
> I fully agree that we should document the agreements on wiki. However, I'd
> hesitate about the MUST / SHOULD phrasing. I personally would consider
> these conventions, except for the one quoted from the ASF policy, as
> empirical inputs for individual judgements, where MUST / SHOULD might be a
> bit too strong.
>
> @Matthias,
>
> Adding a reminder about holiday seasons makes sense to me. I'd suggest to
> narrow this down to public holidays only, excluding personal vacations. At
> the moment, I'm not aware of any public holidays other than Christmas and
> Chinese New Year that need to be noticed in this context. Pardon me for my
> unfamiliarity with the diverse cultures. And of course we can add other
> holidays, if proposed, to the list.
>
>
> @Jing,
>
> I generally agree with your two suggestions. Actually, I think they are
> already been implied by the proposed conventions. It might be nice to keep
> the documented conventions concise. And just to clarify, the proposal here
> is to provide some suggestions / guidance on how to drive FLIPs towards
> consensus, rather than formally change / improve the FLIP process.
>
>
> Best,
>
> Xintong
>
>
>
> On Mon, Jan 30, 2023 at 8:16 PM Jing Ge 
> wrote:
>
>> Thanks all for the valuable information. It is a great step to make the
>> FLIP running more smoothly.
>>
>> I'd like to share two additional suggestions:
>>
>> 1. Veto with care - Voter should present her-/him-self and give a veto
>> with
>> her/his own technical justification. No veto is allowed only based on
>> someone else's opinion.
>> 2. Vacation or holiday seasons - It is a personal choice to join a
>> discussion and voting process during the vacation and holiday season. The
>> focus should be on the FLIP process to avoid blocking it. If the voter
>> decides to get involved, especially with a veto, then she/he should be
>> responsive and responsible during the vacation time. Or, she/he could
>> enjoy
>> the vacation, stay on the sideline and trust others who participate.
>>
>> All improvements wrt the FLIP process should be formally documented and
>> will be referenced if any FLIP got blocking issues in the future.
>>
>> Best regards,
>> Jing
>>
>> On Mon, Jan 30, 2023 at 10:20 AM Matthias Pohl
>>  wrote:
>>
>> > Thanks Xintong for summarizing the PMC discussion here. I agree that
>> > working on trust instead of imposing more rules is the right thing to
>> do.
>> > But I see Dong's point on documenting such an agreement in some way to
>> give
>> > guidance to new contributors.
>> >
>> > One thing that I think might be helpful to include is holiday. When and
>> for
>> > how long someone decides to go on holiday is an individual decision.
>> > Therefore, dealing with it on a per-case basis and being transparent
>> about
>> > it in FLIP discussions is reasonable and should be encouraged.
>> >
>> > That said, there are major holiday seasons. I guess, for the Flink
>> > community, Chinese New Year and Christmas (but maybe also the summer
>> > months) are times where it's more likely for people to plan their
>> vacation.
>> > I'm not proposing to disallow FLIP discussions during that time but
>> adding
>> > a reminder that these vacation periods exist. ...if we decide to move
>> this
>> > into some form of documentation. Just to raise awareness and make FLIP
>> > creators be considerative of these times of the year when planning the
>> > work.
>> >
>> > Of course, this should be formulated in an inclusive manner since there
>> are
>> > also contributors from other parts of the world.
>> >
>> > Best,
>> > Matthias
>> >
>> > On Sat, Jan 28, 2023 at 10:26 AM Dong Lin  wrote:
>> >
>> > > Thanks Xintong for initiating and sharing the discussion!
>> > >
>> > > The agreement described in the summary looks pretty good. In
>> particular,
>> > it
>> > > is great to know that ASF already has guidance requiring "voter must
>> > > provide with the veto a technical justification showing why the
>> change is
>> > > bad". This is exactly what we need. This is an important step towards
>> > > making our FLIP discussion process more collaborative and productive
>> :)
>> > >
>> > > While I agree it is important to work on trust, it is probably still
>> > useful
>> > > to document the agreement described above in the wiki, so that new
>> Flink
>> > > contributors can follow this guidance easily rather than having to
>> read
>> > > this email thread. For example, we can probably have something like
>> this
>> > > <
>> https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
>> > >
>> > > guideline
>> 

[jira] [Created] (FLINK-30934) Refactor ComputedColumnAndWatermarkTableITCase to get rid of managed table

2023-02-06 Thread yuzelin (Jira)
yuzelin created FLINK-30934:
---

 Summary: Refactor ComputedColumnAndWatermarkTableITCase to get rid 
of managed table
 Key: FLINK-30934
 URL: https://issues.apache.org/jira/browse/FLINK-30934
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-02-06 Thread Qingsheng Ren
Thanks for the update, Dong!

+1 for the new table option.

Best,
Qingsheng

On Mon, Feb 6, 2023 at 2:39 PM Dong Lin  wrote:

> As I am reviewing the FLIP-208 PR here
> , I realized that a new public API
> change was added to the Proposed Change section in this
>  modification after this voting
> thread starts, without being mentioned in this thread.
>
> I have moved this change to the Public Interface section now. The change is
> that "a new connector option 'scan.record.evaluator.class' will be added to
> provide the custom RecordEvaluator class".
>
> Since this change is relatively minor and looks good to me, I will re-use
> this voting thread to confirm this change is OK.
>
> Qingsheng and Leonard: can you help check whether this public interface
> change looks good to you?
>
> I will keep this discussion open for at least 72 hours before merging the
> PR.
>
> Thanks,
> Dong
>
>
> On Tue, Dec 27, 2022 at 3:29 PM Leonard Xu  wrote:
>
> > +1 (binding)
> >
> > Best,
> > Leonard
> >
> >
> > > On Dec 26, 2022, at 4:22 PM, Qingsheng Ren  wrote:
> > >
> > > +1 (binding)
> > >
> > > Best,
> > > Qingsheng
> > > Ververica (Alibaba)
> > >
> > > On Wed, Dec 21, 2022 at 3:13 PM Dong Lin  wrote:
> > >
> > >> Hi all,
> > >>
> > >> We would like to start the vote for FLIP-208: Add RecordEvaluator to
> > >> dynamically stop source based on de-serialized records [1]. This FLIP
> > was
> > >> discussed in this thread [2].
> > >>
> > >> This feature is needed by users who currently depend on
> > >> KafkaDeserializationSchema::isEndOfStream() to migrate their Flink job
> > from
> > >> FlinkKafkaConsumer to KafkaSource. And we won't be able to
> > >> remove FlinkKafkaConsumer and FlinkKafkaProducer before adding this
> > >> feature.
> > >>
> > >> Thanks,
> > >> Dong
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
> > >> [2] https://lists.apache.org/thread/z87m68ggzkx0s427tmrllswm4l1g7owc
> > >>
> >
> >
>


Re: [VOTE] FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-02-06 Thread Leonard Xu


> Since this change is relatively minor and looks good to me, I will re-use 
> this voting thread to confirm this change is OK.
> 
> Qingsheng and Leonard: can you help check whether this public interface 
> change looks good to you?
> 

Thanks Dong and Hang for driving this.
 
The change looks good to me, and +1 from my side.

Best,
Leonard



Re: Flink Kubernetes Operator 1.4.0 release planning

2023-02-06 Thread Yang Wang
Thanks Gyula for driving the release again.

It is really exciting to see the auto-scaling coming out of the box.

Best,
Yang

Gyula Fóra  于2023年2月6日周一 19:43写道:

> Hi Devs!
>
> Based on the previously agreed upon release schedule (
>
> https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning
> )
> it is almost time for the 1.4.0 release.
>
> There are still a number of smaller but important PRs open for some
> critical fixes. I would like to merge those in the next 1-2 days and I
> suggest we make the release cut on Wednesday/Thursday.
>
> After that we should spend some time testing the release candidate and
> hopefully we can finalize the release next week!
>
> I volunteer as the release manager.
>
> Cheers,
> Gyula
>


[jira] [Created] (FLINK-30933) Result of join inside iterationBody loses max watermark

2023-02-06 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30933:
-

 Summary: Result of join inside iterationBody loses max watermark
 Key: FLINK-30933
 URL: https://issues.apache.org/jira/browse/FLINK-30933
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang
 Fix For: ml-2.2.0


Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{1, 2}.
{code:java}
public class Test {

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000L);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);

DataStream> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

DataStream> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

DataStream> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(

IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())
.get(0);

DataStream left = iterationJoin.map(x -> x.f0);
DataStream right = iterationJoin.map(x -> x.f0);
DataStream result =
left.join(right)
.where(x -> x)
.equalTo(x -> x)
.window(EndOfStreamWindows.get())
.apply((JoinFunction) (l1, l2) -> l1);

List collectedResult = 
IteratorUtils.toList(result.executeAndCollect());
List expectedResult = Arrays.asList(1L, 2L);
compareResultCollections(expectedResult, collectedResult, 
Long::compareTo);
}

private static class MyIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream> input1 = variableStreams.get(0);
DataStream> input2 = dataStreams.get(0);

DataStream terminationCriteria = input1.flatMap(new 
TerminateOnMaxIter(1));

DataStream> res =
input1.join(input2)
.where(x -> x.f0)
.equalTo(x -> x.f0)
.window(EndOfStreamWindows.get())
.apply(
(JoinFunction<
Tuple2,
Tuple2,
Tuple2>)
(t1, t2) -> t2);

return new IterationBodyResult(
DataStreamList.of(input1), DataStreamList.of(res), 
terminationCriteria);
}
}
}
 {code}
 

There are two possible reasons:
 * The timer in `HeadOperator` is not a daemon process and it does not exit 
even flink job finishes.
 * The max watermark from the iteration body is missed.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-274: Introduce metric group for OperatorCoordinator

2023-02-06 Thread weijie guo
+1 (non-binding)

Best regards,

Weijie


Mason Chen  于2023年2月7日周二 10:11写道:

> +1 (non-binding)
>
> Best,
> Mason
>
>
> On Thu, Feb 2, 2023 at 6:03 PM Leonard Xu  wrote:
>
> > +1
> >
> > Best,
> > Leonard
> >
> > > On Feb 3, 2023, at 9:49 AM, Dong Lin  wrote:
> > >
> > > +1
> > >
> > > On Thu, Feb 2, 2023 at 9:31 PM Hang Ruan 
> wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for all the help about this FLIP. Now let's start the vote
> again.
> > >> Based on the discussion[1], we have come to a consensus, so I would
> > like to
> > >> start a vote on FLIP-274: Introduce metric group for
> > >> OperatorCoordinator[2].
> > >>
> > >> The vote will last for at least 72 hours (Feb 8th at 11:00 GMT) unless
> > >> there is an objection or insufficient votes.
> > >>
> > >> [1] https://lists.apache.org/thread/63m9w60rndqnrqvgb6qosvt2bcbww53k
> > >> [2]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
> > >>
> > >> Best,
> > >> Hang
> > >>
> >
> >
>


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-06 Thread Hang Ruan
Hi Feng,
I agree with what Jark said. I think what you are looking for is lazy
initialization.

I don't think we should introduce the new interface CatalogProvider for
lazy initialization. What we should do is to store the catalog properties
and initialize the catalog when we need it. Could you please introduce some
other scenarios that we need the CatalogProvider besides the lazy
initialization?

If we really need the CatalogProvider, I think it is better to be a single
instance. Multiple instances are difficult to manage and there are name
conflicts among providers.

Best,
Hang

Jark Wu  于2023年2月7日周二 10:48写道:

> Hi Feng,
>
> I think this feature makes a lot of sense. If I understand correctly, what
> you are looking for is lazy catalog initialization.
>
> However, I have some concerns about introducing CatalogProvider, which
> delegates catalog management to users. It may be hard to avoid conflicts
> and duplicates between CatalogProvider and CatalogManager. Is it possible
> to have a built-in CatalogProvider to instantiate catalogs lazily?
>
> An idea in my mind is to introduce another catalog registration API
> without instantiating the catalog, e.g., registerCatalog(String
> catalogName, Map catalogProperties). The catalog
> information is stored in CatalogManager as pure strings. The catalog is
> instantiated and initialized when used.
>
> This new API is very similar to other pure-string metadata registration,
> such as "createTable(String path, TableDescriptor descriptor)" and
> "createFunction(String path, String className, List
> resourceUris)".
>
> Can this approach satisfy your requirement?
>
> Best,
> Jark
>
> On Mon, 6 Feb 2023 at 22:53, Timo Walther  wrote:
>
> > Hi Feng,
> >
> > this is indeed a good proposal.
> >
> > 1) It makes sense to improve the catalog listing for platform providers.
> >
> > 2) Other feedback from the past has shown that users would like to avoid
> > the default in-memory catalog and offer their catalog before a
> > TableEnvironment session starts.
> >
> > 3) Also we might reconsider whether a default catalog and default
> > database make sense. Or whether this can be disabled and SHOW CATALOGS
> > can be used for listing first without having a default catalog.
> >
> > What do you think about option 2 and 3?
> >
> > In any case, I would propose we pass a CatalogProvider to
> > EnvironmentSettings and only allow a single instance. Catalogs should
> > never shadow other catalogs.
> >
> > We could also use the org.apache.flink.table.factories.Factory infra and
> > allow catalog providers via pure string properties. Not sure if we need
> > this in the first version though.
> >
> > Cheers,
> > Timo
> >
> >
> > On 06.02.23 11:21, Feng Jin wrote:
> > > Hi everyone,
> > >
> > > The original discussion address is
> > > https://issues.apache.org/jira/browse/FLINK-30126
> > >
> > > Currently, Flink has access to many systems, including kafka, hive,
> > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > > might be:
> > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > mysql_database2_
> > >
> > > As the platform of the Flink SQL job, we need to maintain the meta
> > > information of each system of the company, and when the Flink job
> > > starts, we need to register the catalog with the Flink table
> > > environment, so that users can use any table through the
> > > env.executeSql interface.
> > >
> > > When we only have a small number of catalogs, we can register like
> > > this, but when there are thousands of catalogs, I think that there
> > > needs to be a dynamic loading mechanism that we can register catalog
> > > when needed, speed up the initialization of the table environment, and
> > > avoid the useless catalog registration process.
> > >
> > > Preliminary thoughts:
> > >
> > > A new CatalogProvider interface can be added:
> > > It contains two interfaces:
> > > * listCatalogs() interface, which can list all the interfaces that the
> > > interface can provide
> > > * getCatalog() interface,  which can get a catalog instance by catalog
> > name.
> > >
> > > ```java
> > > public interface CatalogProvider {
> > >
> > >  default void initialize(ClassLoader classLoader, ReadableConfig
> > config) {}
> > >
> > >  Optional getCatalog(String catalogName);
> > >
> > >  Set listCatalogs();
> > > }
> > > ```
> > >
> > >
> > > The corresponding implementation in CatalogManager is as follows:
> > >
> > > ```java
> > > public CatalogManager {
> > >  private @Nullable CatalogProvider catalogProvider;
> > >
> > >  private Map catalogs;
> > >
> > >  public void setCatalogProvider(CatalogProvider catalogProvider) {
> > >  this.catalogProvider = catalogProvider;
> > >  }
> > >
> > >  public Optional getCatalog(String catalogName) {
> > >  // If there is no corresponding catalog in catalogs,
> > >  // get catalog by 

Re: [DISCUSS] FLIP-289: Support online inference (Flink ML)

2023-02-06 Thread Dong Lin
Hi all,

If there is no question related to this FLIP, we will start the voting
thread on 2/10.

Regards,
Dong

On Wed, Feb 1, 2023 at 8:38 PM Dong Lin  wrote:

> Hi all,
>
> Fan, Jiang, Zhipeng, and I have created FLIP-289: Support online inference
> (Flink ML).
>
> The goal of this FLIP is to enable users to use the model trained by Flink
> ML to do online inference. More details can be found in the FLIP doc at
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240881268
> .
>
> We are looking forward to your comments.
>
> Regards,
> Dong
>


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Steven Wu
> Regarding the discussion on global committer [1] for sinks with global
transactions, there is no consensus on solving that problem in SinkV2. Will
it require any breaking change in SinkV2?

Just want to reiterate my earlier question. What is the migration path for
the Iceberg sink?

[1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj

On Mon, Feb 6, 2023 at 6:22 PM Jark Wu  wrote:

> Hi Konstantin,
>
> I totally agree with making SinkV2 @Public. I just have concerns about
> deprecating SinkFunction at this point. Dong Lin has raised the blocker
> issues of migration multiple times in this thread which I think we should
> address first. I don't know why we rush to deprecate SinkFunction while
> SourceFunction is still public, but the new Source API is much more stable
> and production-ready than SinkV2.
>
> Iceberg community raised concerns[1] about the workability and stability of
> Flink connector APIs.
>
> We are hoping any issues with the APIs for Iceberg connector will surface
> > sooner and get more attention from the Flink community when the connector
> > is within Flink umbrella rather than in Iceberg repo.
>
>
> The connector externalizing is a big step for building a mechanism to
> guarantee Flink connector API is stable and workable. The follow-up step
> should be trying the new APIs in externalized connectors and giving users
> the confidence to migrate.
>
>
> Best,
> Jark
>
> [1]: https://lists.apache.org/thread/r5zqnkt01x2c611brkjmxxnt3bfcgl1b
>
> On Tue, 7 Feb 2023 at 09:53, yuxia  wrote:
>
> > Hi Konstantin,
> > Just FYI, the FileSystemTableSink are still using SinkFunction.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Dong Lin" 
> > 收件人: "dev" 
> > 抄送: "Jing Ge" , "Yun Tang" 
> > 发送时间: 星期二, 2023年 2 月 07日 上午 9:41:07
> > 主题: Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction
> >
> > Hi Konstantin,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Mon, Feb 6, 2023 at 9:48 PM Konstantin Knauf 
> wrote:
> >
> > > Hi Steven,
> > >
> > > Sink is already deprecated. It was deprecated at the moment where we
> > > introduced SinkV2.
> > >
> > > Hi Jark, Hi Dong,
> > >
> > > My understanding is the SinkV2 is a workable interface. The most
> > important
> > > connectors have been migrated (Kafka, Filesystem) and more connectors
> > > (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make
> SinkV2
> > > public, it does not need to have all possible functionality. Public
> APIs
> > > can be extended. That's what we do all the time. There will also always
> > be
> > > bugs. So, these points can not be categorical blockers to promote the
> > API.
> > >
> >
> > Right. I believe we all agree that we make SinkV2 @PublicEvolving. The
> > concern here is whether we can mark SinkFunction as deprecated at this
> > point.
> >
> >
> > >
> > > What are the specific issues/tickets that are blocking us? Can we in
> your
> > >
> >
> > For example, Lijie mentioned earlier in this thread that according to
> > FLIP-287,
> > currently the Sink.InitContext still lacks some necessary information to
> > migrate existing connectors to new sinks. This could be a blocker issue
> > since this is related to the SinkV2 API design.
> >
> > And Yuxia mentioned earlier in this thread that there are bugs such as
> > FLINK-30238  and
> > FLINK-29459  , which
> > makes it hard to use SinkV2 properly in production.  It seems that these
> > two bugs are created months ago and are still unresolved or even
> > unassigned. This looks like a clear signal that SinkV2 is not being
> > actively maintained and used in production.
> >
> >
> > > opinion only deprecate it when every single connector in Apache Flink
> is
> > > migrated already?
> > >
> >
> > Technically this is not a hard requirement. But I would suggest that we
> > should migrate all existing connectors so that we eat our own dogfood and
> > prove to users that SinkV2 is ready for use in production.
> >
> >
> > > In my opinion it is the time to ask users to the migrate their
> > connectors.
> > > More importantly, @Deprecated would signal users not to build new
> > > connectors on SinkFunction. I would arque its also very misleading to
> > users
> > > to not @Deprecated SinkFunction given that is clearly will be
> deprecated.
> >
> >
> >
> >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > >
> > > Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
> > >
> > > > I agree with Dong Lin.
> > > >
> > > > Oracle explains how to use Deprecate API [1]:
> > > >
> > > > You are strongly recommended to use the Javadoc @deprecated tag with
> > > > > appropriate comments explaining how to use the new API. This
> ensures
> > > > > developers will *have a workable migration path from the old API to
> > the
> > > > > new API*.
> > > >
> > > >
> > > > From a user's perspective, the workable migration path is 

Re: Need help how to use Table API to join two Kafka streams

2023-02-06 Thread yuxia
Hi, could you please share us the root cause? 
Seems the error message you posted hadn't contained the root cause. Maybe you 
can post the full error message . 

Best regards, 
Yuxia 


发件人: "Amir Hossein Sharifzadeh"  
收件人: "yuxia"  
抄送: "dev"  
发送时间: 星期二, 2023年 2 月 07日 上午 10:39:25 
主题: Re: Need help how to use Table API to join two Kafka streams 

Thank you for your reply. I tied it with a sample stream but it did not work. I 
am trying to get the results from my producer here with a very simple query. I 
want to see results in the console/output. 
This is my code: 
// Docker: docker-compose.yml 
version: '2' 
services: 
zookeeper: 
image: confluentinc/cp-zookeeper:6.1.1 
hostname: zookeeper 
container_name: zookeeper 
ports: 
- "2181:2181" 
environment: 
ZOOKEEPER_CLIENT_PORT: 2181 
ZOOKEEPER_TICK_TIME: 2000 

broker: 
image: confluentinc/cp-kafka:6.1.1 
hostname: broker 
container_name: broker 
depends_on: 
- zookeeper 
ports: 
- "29092:29092" 
- "9092:9092" 
- "9101:9101" 
environment: 
KAFKA_BROKER_ID: 1 
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 
KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 


// Producer 
import json 
import sys 

from kafka import KafkaProducer 

KAFKA_SERVER = " [ http://127.0.0.1:9092/ | 127.0.0.1:9092 ] " 
def serializer (dictionary): 
try : 
message = json.dumps(dictionary) 
except Exception as e: 
sys.stderr.write( str (e) + ' \n ' ) 
message = str (dictionary) 
return message.encode( 'utf8' ) 

def create_sample_empad_json (raw_id): 
return { 'raw_id' : int (raw_id), 'raw_data' : str ( int (raw_id) + 7 )} 
def do_produce ( ): 
producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer 
=serializer) 
for raw_id in range ( 1 , 10 ): 
empad_json = data_helper.create_sample_empad_json(raw_id) 
producer.send(' EMPAD', empad_json) 
producer.flush() 

if __name__ == '__main__' : 
do_produce(XRD_PATH) 

// Flink 
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment 
from pyflink.table import EnvironmentSettings 
from pyflink.table.table_environment import StreamTableEnvironment 

def data_processing (): 
env = StreamExecutionEnvironment.get_execution_environment() 
env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) 
env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) 
env.add_jars( 
"file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) 

settings = EnvironmentSettings.new_instance() \ 
.in_streaming_mode() \ 
.build() 

t_env = StreamTableEnvironment.create( stream_execution_environment =env, 
environment_settings =settings) 

t1 = f""" 
CREATE TEMPORARY TABLE raw_table( 
raw_id INT, 
raw_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = 'EMPAD', 
'properties.bootstrap.servers' = 'localhost:9092', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t_env.execute_sql(t1) 

table_result = t_env.execute_sql( " select raw_id, raw_data from raw_table " ) 

with table_result.collect() as results: 
for result in results: 
print (result) 
if __name__ == '__main__' : 
data_processing() 


getting this error message: 
pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Failed to execute sql 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:498) 
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 
at java.lang.Thread.run(Thread.java:750) 
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'collect'. 
at 

Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-02-06 Thread Shammon FY
Hi Piotr

Thanks for your feedback.

> - stateless operators, could completely ignore the issue and process the
records normally, as they are doing right now
> - stateful operators, should either:
> - if the business doesn't require ordering, they could process the
records immediately
> - or buffer the records internally, like currently windowed/temporal
operators are doing. Non windowed joins/aggregations could also work in a
similar manner, like pre-aggregate data per each "epoch" (as demarcated by
timestamp barriers).
> - sinks implementation would have to match what external system support:
> - if the external system requires ordered writes (something like
Kafka topic?), the sinks would have to buffer the writes until a "timestamp
barrier" arrives
> - some sinks might support writing the data simultaneously to
different "epochs". For example writing files bucketed by each epoch. Each
bucket/epoch could be committed independently

It sounds good to me and I totally agree with the proposal. We need to give
users more choices to meet different business needs and storage support. I
have updated the key points in the FLINK section[1]

> Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't
say that the timestamp barrier has been committed, but that all records for
given "epoch" have been processed/written, but not yet committed, so they
can still be rolled-back?

Nice! According to your suggestion, I have updated the FLIP for "epoch" as:
1. It is PROCESSED when records are written to a table
2. It is WRITTEN when the records are in a snapshot
3. It is PRECOMMIT when all tables are PROCESSED but not WRITTEN
4. It is COMMIT when all tables are WRITTEN
Records not WRITTEN in a table will be rolled back due to job failure.


> Why do we need to do that? Only to disallow this? To forbid writing from
two jobs into a single table? If so, can we not push this responsibility
down to the connector? Like sink/source operator coordinators should
negotiate with respective external systems if the given read/write is
allowed? So if there is a need for such meta service, Flink doesn't need to
know about it?

As I mentioned, MetaService will do some atomic operations to check and
disallow some operations when jobs are submitted concurrently. But I'm
sorry that I may not have explained the relationship between it and
sink/source clearly. Generally speaking, the interactive between Flink and
MetaService is as:
1. When the Client submits a flink job (streaming), it interacts with
MetaService through Catalog in CatalogManager, including getting the table
version, registering the source/link table relationship for ETL.
2. When the flink job is running, JobManager collects data processing
progress (Timestamp Barrier and Checkpoint) from source/link subtasks and
reports them to MetaService.
We can implement the above functions in a MetaService node. Of course, it
can also be based on an atomic system (such as Zookeeper), with Client and
JobManager doing their own work.

Of course, source and sink also need some special work, such as reading
timestamp barrier, collecting timestamp barrier, writing timestamp barrier,
etc. But source/sink subtasks will not interact with MetaService directly.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-GlobalTimestampBarrierMechanism


Best,
Shammon


On Tue, Feb 7, 2023 at 1:26 AM Piotr Nowojski  wrote:

>  Hi,
>
> Thanks for the answers.
>
> >> Are you proposing that all of the inputs to stateful operators would
> have to be sorted?
> >>
> > Records in stream don't need to be sorted, but it should be managed by
> `Timestamp Barrier`, which means
> > 1. Records belonging to a specific `Timestamp Barrier` are disordered.
> > 2. Computations in different timestamp barriers are ordered. For the
> above
> > example, each stateful subtask can start computation for T2 only after it
> > finishes computation for T1. Subtasks are independent of each other.
>
> Wouldn't that add significant latency to processing the records? You would
> basically introduce a batch processing concept in Flink?
>
> Have you considered some alternative solutions? Like for example letting
> each operator/function/sink to take care of the data disorder? For example:
> - stateless operators, could completely ignore the issue and process the
> records normally, as they are doing right now
> - stateful operators, should either:
> - if the business doesn't require ordering, they could process the
> records immediately
> - or buffer the records internally, like currently windowed/temporal
> operators are doing. Non windowed joins/aggregations could also work in a
> similar manner, like pre-aggregate data per each "epoch" (as demarcated by
> timestamp barriers).
> - sinks implementation would have to match what external system support:
> 

Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-06 Thread weijie guo
Thanks David for driving this. This is a very valuable work, especially for
cloud native environment.

>> How about adding some more information such as vertex type
(SOURCE/MAP/JOIN and .etc) in the response of `get jobs
resource-requirements`? For users, only vertex-id may be difficult to
understand.

+1 for this suggestion, including jobvertex's name in the response body is more
user-friendly.


I saw this sentence in FLIP: "Setting the upper bound to -1 will reset the
value to the default setting."  What is the default value here (based on
what configuration), or just infinite?


Best regards,

Weijie



Shammon FY  于2023年2月6日周一 18:06写道:

> Hi David
>
> Thanks for initiating this discussion. I think declaring job resource
> requirements by REST API is very valuable. I just left some comments as
> followed
>
> 1) How about adding some more information such as vertex type
> (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> resource-requirements`? For users, only vertex-id may be difficult to
> understand.
>
> 2) For sql jobs, we always use a unified parallelism for most vertices. Can
> we provide them with a more convenient setting method instead of each one?
>
>
> Best,
> Shammon
>
>
> On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl  .invalid>
> wrote:
>
> > Thanks David for creating this FLIP. It sounds promising and useful to
> > have. Here are some thoughts from my side (some of them might be rather a
> > follow-up and not necessarily part of this FLIP):
> > - I'm wondering whether it makes sense to add some kind of resource ID to
> > the REST API. This would give Flink a tool to verify the PATCH request of
> > the external system in a compare-and-set kind of manner. AFAIU, the
> process
> > requires the external system to retrieve the resource requirements first
> > (to retrieve the vertex IDs). A resource ID  would be sent along as
> a
> > unique identifier for the provided setup. It's essentially the version ID
> > of the currently deployed resource requirement configuration. Flink
> doesn't
> > know whether the external system would use the provided information in
> some
> > way to derive a new set of resource requirements for this job. The
> > subsequent PATCH request with updated resource requirements would include
> > the previously retrieved resource ID . The PATCH call would fail if
> > there was a concurrent PATCH call in between indicating to the external
> > system that the resource requirements were concurrently updated.
> > - How often do we allow resource requirements to be changed? That
> question
> > might make my previous comment on the resource ID obsolete because we
> could
> > just make any PATCH call fail if there was a resource requirement update
> > within a certain time frame before the request. But such a time period is
> > something we might want to make configurable then, I guess.
> > - Versioning the JobGraph in the JobGraphStore rather than overwriting it
> > might be an idea. This would enable us to provide resource requirement
> > changes in the UI or through the REST API. It is related to a problem
> > around keeping track of the exception history within the
> AdaptiveScheduler
> > and also having to consider multiple versions of a JobGraph. But for that
> > one, we use the ExecutionGraphInfoStore right now.
> > - Updating the JobGraph in the JobGraphStore makes sense. I'm just
> > wondering whether we bundle two things together that are actually
> separate:
> > The business logic and the execution configuration (the resource
> > requirements). I'm aware that this is not a flaw of the current FLIP but
> > rather something that was not necessary to address in the past because
> the
> > JobGraph was kind of static. I don't remember whether that was already
> > discussed while working on the AdaptiveScheduler for FLIP-160 [1]. Maybe,
> > I'm missing some functionality here that requires us to have everything
> in
> > one place. But it feels like updating the entire JobGraph which could be
> > actually a "config change" is not reasonable. ...also considering the
> > amount of data that can be stored in a ConfigMap/ZooKeeper node if
> > versioning the resource requirement change as proposed in my previous
> item
> > is an option for us.
> > - Updating the JobGraphStore means adding more requests to the HA backend
> > API. There were some concerns shared in the discussion thread [2] for
> > FLIP-270 [3] on pressuring the k8s API server in the past with too many
> > calls. Eventhough, it's more likely to be caused by checkpointing, I
> still
> > wanted to bring it up. We're working on a standardized performance test
> to
> > prepare going forward with FLIP-270 [3] right now.
> >
> > Best,
> > Matthias
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> > [2] https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
> > [3]
> >
> >
> 

Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-06 Thread Jark Wu
Hi Feng,

I think this feature makes a lot of sense. If I understand correctly, what
you are looking for is lazy catalog initialization.

However, I have some concerns about introducing CatalogProvider, which
delegates catalog management to users. It may be hard to avoid conflicts
and duplicates between CatalogProvider and CatalogManager. Is it possible
to have a built-in CatalogProvider to instantiate catalogs lazily?

An idea in my mind is to introduce another catalog registration API
without instantiating the catalog, e.g., registerCatalog(String
catalogName, Map catalogProperties). The catalog
information is stored in CatalogManager as pure strings. The catalog is
instantiated and initialized when used.

This new API is very similar to other pure-string metadata registration,
such as "createTable(String path, TableDescriptor descriptor)" and
"createFunction(String path, String className, List
resourceUris)".

Can this approach satisfy your requirement?

Best,
Jark

On Mon, 6 Feb 2023 at 22:53, Timo Walther  wrote:

> Hi Feng,
>
> this is indeed a good proposal.
>
> 1) It makes sense to improve the catalog listing for platform providers.
>
> 2) Other feedback from the past has shown that users would like to avoid
> the default in-memory catalog and offer their catalog before a
> TableEnvironment session starts.
>
> 3) Also we might reconsider whether a default catalog and default
> database make sense. Or whether this can be disabled and SHOW CATALOGS
> can be used for listing first without having a default catalog.
>
> What do you think about option 2 and 3?
>
> In any case, I would propose we pass a CatalogProvider to
> EnvironmentSettings and only allow a single instance. Catalogs should
> never shadow other catalogs.
>
> We could also use the org.apache.flink.table.factories.Factory infra and
> allow catalog providers via pure string properties. Not sure if we need
> this in the first version though.
>
> Cheers,
> Timo
>
>
> On 06.02.23 11:21, Feng Jin wrote:
> > Hi everyone,
> >
> > The original discussion address is
> > https://issues.apache.org/jira/browse/FLINK-30126
> >
> > Currently, Flink has access to many systems, including kafka, hive,
> > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > might be:
> > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > mysql_database2_
> >
> > As the platform of the Flink SQL job, we need to maintain the meta
> > information of each system of the company, and when the Flink job
> > starts, we need to register the catalog with the Flink table
> > environment, so that users can use any table through the
> > env.executeSql interface.
> >
> > When we only have a small number of catalogs, we can register like
> > this, but when there are thousands of catalogs, I think that there
> > needs to be a dynamic loading mechanism that we can register catalog
> > when needed, speed up the initialization of the table environment, and
> > avoid the useless catalog registration process.
> >
> > Preliminary thoughts:
> >
> > A new CatalogProvider interface can be added:
> > It contains two interfaces:
> > * listCatalogs() interface, which can list all the interfaces that the
> > interface can provide
> > * getCatalog() interface,  which can get a catalog instance by catalog
> name.
> >
> > ```java
> > public interface CatalogProvider {
> >
> >  default void initialize(ClassLoader classLoader, ReadableConfig
> config) {}
> >
> >  Optional getCatalog(String catalogName);
> >
> >  Set listCatalogs();
> > }
> > ```
> >
> >
> > The corresponding implementation in CatalogManager is as follows:
> >
> > ```java
> > public CatalogManager {
> >  private @Nullable CatalogProvider catalogProvider;
> >
> >  private Map catalogs;
> >
> >  public void setCatalogProvider(CatalogProvider catalogProvider) {
> >  this.catalogProvider = catalogProvider;
> >  }
> >
> >  public Optional getCatalog(String catalogName) {
> >  // If there is no corresponding catalog in catalogs,
> >  // get catalog by catalogProvider
> >  if (catalogProvider != null) {
> >  Optional catalog =
> catalogProvider.getCatalog(catalogName);
> >  }
> >  }
> >
> > }
> > ```
> >
> >
> >
> > Possible problems:
> >
> > 1. Catalog name conflict, how to choose when the registered catalog
> > and the catalog provided by catalog-provider conflict?
> > I prefer tableEnv-registered ones over catalogs provided by the
> > catalog-provider. If the user wishes to reference the catalog provided
> > by the catalog-provider, they can unregister the catalog in tableEnv
> > through the `unregisterCatalog` interface.
> >
> > 2. Number of CatalogProviders, is it possible to have multiple
> > catalogProvider implementations?
> > I don't have a good idea of this at the moment. If multiple
> > catalogProviders are supported, it brings much more 

Re: Need help how to use Table API to join two Kafka streams

2023-02-06 Thread Amir Hossein Sharifzadeh
Thank you for your reply. I tied it with a sample stream but it did not
work. I am trying to get the results from my producer here with a very
simple query. I want to see results in the console/output.

This is my code:

// Docker: docker-compose.yml

version: '2'
services:
  zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
hostname: zookeeper
container_name: zookeeper
ports:
  - "2181:2181"
environment:
  ZOOKEEPER_CLIENT_PORT: 2181
  ZOOKEEPER_TICK_TIME: 2000

  broker:
image: confluentinc/cp-kafka:6.1.1
hostname: broker
container_name: broker
depends_on:
  - zookeeper
ports:
  - "29092:29092"
  - "9092:9092"
  - "9101:9101"
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0



// Producer

import json
import sys

from kafka import KafkaProducer

KAFKA_SERVER = "127.0.0.1:9092"

def serializer(dictionary):
try:
message = json.dumps(dictionary)
except Exception as e:
sys.stderr.write(str(e) + '\n')
message = str(dictionary)
return message.encode('utf8')

def create_sample_empad_json(raw_id):
return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)}

def do_produce():
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer
=serializer)
for raw_id in range(1,10):
empad_json = data_helper.create_sample_empad_json(raw_id)
producer.send('EMPAD', empad_json)

producer.flush()

if __name__ == '__main__':
do_produce(XRD_PATH)


// Flink

from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment
from pyflink.table import  EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment

def data_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")

env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")

env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")

settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()

t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)

t1 = f"""
CREATE TEMPORARY TABLE raw_table(
raw_id INT,
raw_data STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'EMPAD',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'MY_GRP',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)
"""

t_env.execute_sql(t1)

table_result = t_env.execute_sql("select raw_id, raw_data from raw_table")

with table_result.collect() as results:
for result in results:
print(result)

if __name__ == '__main__':
data_processing()



getting this error message:

pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.
at 

Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Jark Wu
Hi Konstantin,

I totally agree with making SinkV2 @Public. I just have concerns about
deprecating SinkFunction at this point. Dong Lin has raised the blocker
issues of migration multiple times in this thread which I think we should
address first. I don't know why we rush to deprecate SinkFunction while
SourceFunction is still public, but the new Source API is much more stable
and production-ready than SinkV2.

Iceberg community raised concerns[1] about the workability and stability of
Flink connector APIs.

We are hoping any issues with the APIs for Iceberg connector will surface
> sooner and get more attention from the Flink community when the connector
> is within Flink umbrella rather than in Iceberg repo.


The connector externalizing is a big step for building a mechanism to
guarantee Flink connector API is stable and workable. The follow-up step
should be trying the new APIs in externalized connectors and giving users
the confidence to migrate.


Best,
Jark

[1]: https://lists.apache.org/thread/r5zqnkt01x2c611brkjmxxnt3bfcgl1b

On Tue, 7 Feb 2023 at 09:53, yuxia  wrote:

> Hi Konstantin,
> Just FYI, the FileSystemTableSink are still using SinkFunction.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Dong Lin" 
> 收件人: "dev" 
> 抄送: "Jing Ge" , "Yun Tang" 
> 发送时间: 星期二, 2023年 2 月 07日 上午 9:41:07
> 主题: Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction
>
> Hi Konstantin,
>
> Thanks for the reply. Please see my comments inline.
>
> On Mon, Feb 6, 2023 at 9:48 PM Konstantin Knauf  wrote:
>
> > Hi Steven,
> >
> > Sink is already deprecated. It was deprecated at the moment where we
> > introduced SinkV2.
> >
> > Hi Jark, Hi Dong,
> >
> > My understanding is the SinkV2 is a workable interface. The most
> important
> > connectors have been migrated (Kafka, Filesystem) and more connectors
> > (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
> > public, it does not need to have all possible functionality. Public APIs
> > can be extended. That's what we do all the time. There will also always
> be
> > bugs. So, these points can not be categorical blockers to promote the
> API.
> >
>
> Right. I believe we all agree that we make SinkV2 @PublicEvolving. The
> concern here is whether we can mark SinkFunction as deprecated at this
> point.
>
>
> >
> > What are the specific issues/tickets that are blocking us? Can we in your
> >
>
> For example, Lijie mentioned earlier in this thread that according to
> FLIP-287,
> currently the Sink.InitContext still lacks some necessary information to
> migrate existing connectors to new sinks. This could be a blocker issue
> since this is related to the SinkV2 API design.
>
> And Yuxia mentioned earlier in this thread that there are bugs such as
> FLINK-30238  and
> FLINK-29459  , which
> makes it hard to use SinkV2 properly in production.  It seems that these
> two bugs are created months ago and are still unresolved or even
> unassigned. This looks like a clear signal that SinkV2 is not being
> actively maintained and used in production.
>
>
> > opinion only deprecate it when every single connector in Apache Flink is
> > migrated already?
> >
>
> Technically this is not a hard requirement. But I would suggest that we
> should migrate all existing connectors so that we eat our own dogfood and
> prove to users that SinkV2 is ready for use in production.
>
>
> > In my opinion it is the time to ask users to the migrate their
> connectors.
> > More importantly, @Deprecated would signal users not to build new
> > connectors on SinkFunction. I would arque its also very misleading to
> users
> > to not @Deprecated SinkFunction given that is clearly will be deprecated.
>
>
>
>
> > Cheers,
> >
> > Konstantin
> >
> >
> > Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
> >
> > > I agree with Dong Lin.
> > >
> > > Oracle explains how to use Deprecate API [1]:
> > >
> > > You are strongly recommended to use the Javadoc @deprecated tag with
> > > > appropriate comments explaining how to use the new API. This ensures
> > > > developers will *have a workable migration path from the old API to
> the
> > > > new API*.
> > >
> > >
> > > From a user's perspective, the workable migration path is very
> important.
> > > Otherwise, it blurs the semantics of API deprecation. The Flink API's
> > > compatibility and stability issues in the past left a bad impression on
> > the
> > > downstream projects. We should be careful when changing and deprecating
> > > APIs, especially when there are known migration gaps. I think it's a
> good
> > > idea to migrate Flink-owned connectors before marking old API
> deprecated.
> > > This ensures downstream projects can migrate to new APIs smoothly.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> > >
> >
> https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html
> > >
> > > On 

[jira] [Created] (FLINK-30932) Enabling producer metrics for KafkaSink is not documented

2023-02-06 Thread Mason Chen (Jira)
Mason Chen created FLINK-30932:
--

 Summary: Enabling producer metrics for KafkaSink is not documented
 Key: FLINK-30932
 URL: https://issues.apache.org/jira/browse/FLINK-30932
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.17.0
Reporter: Mason Chen
 Fix For: 1.17.0


Users can enable producer metrics by setting `register.producer.metrics` to 
True. We should expose this as a ConfigOption to automate it with Flink's 
documentation process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-kafka, release candidate #1

2023-02-06 Thread Mason Chen
That makes sense, thanks for the clarification!

Best,
Mason

On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser 
wrote:

> Hi Mason,
>
> Thanks, [4] is indeed a copy-paste error and you've made the right
> assumption that
>
> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/
> is the correct maven central link.
>
> I think we should use FLINK-30052 to move the Kafka connector code from the
> 1.17 release also over the Kafka connector repo (especially since there's
> now a v3.0 branch for the Kafka connector, so it can be merged in main).
> When those commits have been merged, we can make a next Kafka connector
> release (which is equivalent to the 1.17 release, which can only be done
> when 1.17 is done because of the split level watermark alignment) and then
> FLINK-30859 can be finished.
>
> Best regards,
>
> Martijn
>
> Op wo 1 feb. 2023 om 09:16 schreef Mason Chen :
>
> > +1 (non-binding)
> >
> > * Verified hashes and signatures
> > * Verified no binaries
> > * Verified LICENSE and NOTICE files
> > * Verified poms point to 3.0.0-1.16
> > * Reviewed web PR
> > * Built from source
> > * Verified git tag
> >
> > I think [4] your is a copy-paste error and I did all the verification
> > assuming that
> >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/
> > is the correct maven central link.
> >
> > Regarding the release notes, should we close
> > https://issues.apache.org/jira/browse/FLINK-30052 and link it there?
> I've
> > created https://issues.apache.org/jira/browse/FLINK-30859 to remove the
> > existing code from the master branch.
> >
> > Best,
> > Mason
> >
> > On Tue, Jan 31, 2023 at 6:23 AM Martijn Visser  >
> > wrote:
> >
> > > Hi everyone,
> > > Please review and vote on the release candidate #1 for
> > > flink-connector-kafka version 3.0.0, as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > > Note: this is the same code as the Kafka connector for the Flink 1.16
> > > release.
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release to be deployed to dist.apache.org
> > > [2],
> > > which are signed with the key with fingerprint
> > > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag v3.0.0-rc1 [5],
> > > * website pull request listing the new release [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Release Manager
> > >
> > > [1]
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352577
> > > [2]
> > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1/
> > > [5]
> > >
> https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc1
> > > [6] https://github.com/apache/flink-web/pull/606
> > >
> >
>


Re: [VOTE] FLIP-274: Introduce metric group for OperatorCoordinator

2023-02-06 Thread Mason Chen
+1 (non-binding)

Best,
Mason


On Thu, Feb 2, 2023 at 6:03 PM Leonard Xu  wrote:

> +1
>
> Best,
> Leonard
>
> > On Feb 3, 2023, at 9:49 AM, Dong Lin  wrote:
> >
> > +1
> >
> > On Thu, Feb 2, 2023 at 9:31 PM Hang Ruan  wrote:
> >
> >> Hi all,
> >>
> >> Thanks for all the help about this FLIP. Now let's start the vote again.
> >> Based on the discussion[1], we have come to a consensus, so I would
> like to
> >> start a vote on FLIP-274: Introduce metric group for
> >> OperatorCoordinator[2].
> >>
> >> The vote will last for at least 72 hours (Feb 8th at 11:00 GMT) unless
> >> there is an objection or insufficient votes.
> >>
> >> [1] https://lists.apache.org/thread/63m9w60rndqnrqvgb6qosvt2bcbww53k
> >> [2]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
> >>
> >> Best,
> >> Hang
> >>
>
>


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread yuxia
Hi Konstantin,
Just FYI, the FileSystemTableSink are still using SinkFunction.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Dong Lin" 
收件人: "dev" 
抄送: "Jing Ge" , "Yun Tang" 
发送时间: 星期二, 2023年 2 月 07日 上午 9:41:07
主题: Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

Hi Konstantin,

Thanks for the reply. Please see my comments inline.

On Mon, Feb 6, 2023 at 9:48 PM Konstantin Knauf  wrote:

> Hi Steven,
>
> Sink is already deprecated. It was deprecated at the moment where we
> introduced SinkV2.
>
> Hi Jark, Hi Dong,
>
> My understanding is the SinkV2 is a workable interface. The most important
> connectors have been migrated (Kafka, Filesystem) and more connectors
> (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
> public, it does not need to have all possible functionality. Public APIs
> can be extended. That's what we do all the time. There will also always be
> bugs. So, these points can not be categorical blockers to promote the API.
>

Right. I believe we all agree that we make SinkV2 @PublicEvolving. The
concern here is whether we can mark SinkFunction as deprecated at this
point.


>
> What are the specific issues/tickets that are blocking us? Can we in your
>

For example, Lijie mentioned earlier in this thread that according to FLIP-287,
currently the Sink.InitContext still lacks some necessary information to
migrate existing connectors to new sinks. This could be a blocker issue
since this is related to the SinkV2 API design.

And Yuxia mentioned earlier in this thread that there are bugs such as
FLINK-30238  and
FLINK-29459  , which
makes it hard to use SinkV2 properly in production.  It seems that these
two bugs are created months ago and are still unresolved or even
unassigned. This looks like a clear signal that SinkV2 is not being
actively maintained and used in production.


> opinion only deprecate it when every single connector in Apache Flink is
> migrated already?
>

Technically this is not a hard requirement. But I would suggest that we
should migrate all existing connectors so that we eat our own dogfood and
prove to users that SinkV2 is ready for use in production.


> In my opinion it is the time to ask users to the migrate their connectors.
> More importantly, @Deprecated would signal users not to build new
> connectors on SinkFunction. I would arque its also very misleading to users
> to not @Deprecated SinkFunction given that is clearly will be deprecated.




> Cheers,
>
> Konstantin
>
>
> Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
>
> > I agree with Dong Lin.
> >
> > Oracle explains how to use Deprecate API [1]:
> >
> > You are strongly recommended to use the Javadoc @deprecated tag with
> > > appropriate comments explaining how to use the new API. This ensures
> > > developers will *have a workable migration path from the old API to the
> > > new API*.
> >
> >
> > From a user's perspective, the workable migration path is very important.
> > Otherwise, it blurs the semantics of API deprecation. The Flink API's
> > compatibility and stability issues in the past left a bad impression on
> the
> > downstream projects. We should be careful when changing and deprecating
> > APIs, especially when there are known migration gaps. I think it's a good
> > idea to migrate Flink-owned connectors before marking old API deprecated.
> > This ensures downstream projects can migrate to new APIs smoothly.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html
> >
> > On Mon, 6 Feb 2023 at 10:01, Steven Wu  wrote:
> >
> > > Regarding the discussion on global committer [1] for sinks with global
> > > transactions, there is no consensus on solving that problem in SinkV2.
> > Will
> > > it require any breaking change in SinkV2?
> > >
> > > Also will SinkV1 be deprecated too? or it should happen sometime after
> > > SinkFunction deprecation?
> > >
> > > [1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
> > >
> > > On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:
> > >
> > > > Hi Konstantin,
> > > >
> > > > Thanks for the comment! Please see my comment inline.
> > > >
> > > > Cheers,
> > > > Dong
> > > >
> > > > On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf 
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > sorry for joining the discussion late.
> > > > >
> > > > > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> > > > leaving
> > > > > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2
> > > @Public
> > > > in
> > > > > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended
> > for
> > > > > public use. So, I don't see it as a blocker for deprecating
> > > SinkFunction
> > > > > that we have to make SinkV2 @PublicEvovling. For reference this is
> > the
> > > > > description of 

Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Dong Lin
Hi Konstantin,

Thanks for the reply. Please see my comments inline.

On Mon, Feb 6, 2023 at 9:48 PM Konstantin Knauf  wrote:

> Hi Steven,
>
> Sink is already deprecated. It was deprecated at the moment where we
> introduced SinkV2.
>
> Hi Jark, Hi Dong,
>
> My understanding is the SinkV2 is a workable interface. The most important
> connectors have been migrated (Kafka, Filesystem) and more connectors
> (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
> public, it does not need to have all possible functionality. Public APIs
> can be extended. That's what we do all the time. There will also always be
> bugs. So, these points can not be categorical blockers to promote the API.
>

Right. I believe we all agree that we make SinkV2 @PublicEvolving. The
concern here is whether we can mark SinkFunction as deprecated at this
point.


>
> What are the specific issues/tickets that are blocking us? Can we in your
>

For example, Lijie mentioned earlier in this thread that according to FLIP-287,
currently the Sink.InitContext still lacks some necessary information to
migrate existing connectors to new sinks. This could be a blocker issue
since this is related to the SinkV2 API design.

And Yuxia mentioned earlier in this thread that there are bugs such as
FLINK-30238  and
FLINK-29459  , which
makes it hard to use SinkV2 properly in production.  It seems that these
two bugs are created months ago and are still unresolved or even
unassigned. This looks like a clear signal that SinkV2 is not being
actively maintained and used in production.


> opinion only deprecate it when every single connector in Apache Flink is
> migrated already?
>

Technically this is not a hard requirement. But I would suggest that we
should migrate all existing connectors so that we eat our own dogfood and
prove to users that SinkV2 is ready for use in production.


> In my opinion it is the time to ask users to the migrate their connectors.
> More importantly, @Deprecated would signal users not to build new
> connectors on SinkFunction. I would arque its also very misleading to users
> to not @Deprecated SinkFunction given that is clearly will be deprecated.




> Cheers,
>
> Konstantin
>
>
> Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
>
> > I agree with Dong Lin.
> >
> > Oracle explains how to use Deprecate API [1]:
> >
> > You are strongly recommended to use the Javadoc @deprecated tag with
> > > appropriate comments explaining how to use the new API. This ensures
> > > developers will *have a workable migration path from the old API to the
> > > new API*.
> >
> >
> > From a user's perspective, the workable migration path is very important.
> > Otherwise, it blurs the semantics of API deprecation. The Flink API's
> > compatibility and stability issues in the past left a bad impression on
> the
> > downstream projects. We should be careful when changing and deprecating
> > APIs, especially when there are known migration gaps. I think it's a good
> > idea to migrate Flink-owned connectors before marking old API deprecated.
> > This ensures downstream projects can migrate to new APIs smoothly.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html
> >
> > On Mon, 6 Feb 2023 at 10:01, Steven Wu  wrote:
> >
> > > Regarding the discussion on global committer [1] for sinks with global
> > > transactions, there is no consensus on solving that problem in SinkV2.
> > Will
> > > it require any breaking change in SinkV2?
> > >
> > > Also will SinkV1 be deprecated too? or it should happen sometime after
> > > SinkFunction deprecation?
> > >
> > > [1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
> > >
> > > On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:
> > >
> > > > Hi Konstantin,
> > > >
> > > > Thanks for the comment! Please see my comment inline.
> > > >
> > > > Cheers,
> > > > Dong
> > > >
> > > > On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf 
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > sorry for joining the discussion late.
> > > > >
> > > > > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> > > > leaving
> > > > > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2
> > > @Public
> > > > in
> > > > > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended
> > for
> > > > > public use. So, I don't see it as a blocker for deprecating
> > > SinkFunction
> > > > > that we have to make SinkV2 @PublicEvovling. For reference this is
> > the
> > > > > description of @PublicEvovling:
> > > > >
> > > > > /**
> > > > >  * Annotation to mark classes and methods for public use, but with
> > > > > evolving interfaces.
> > > > >  *
> > > > >  * Classes and methods with this annotation are intended for
> > public
> > > > > use and have stable behavior.
> > > > > 

Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-02-06 Thread Piotr Nowojski
 Hi,

Thanks for the answers.

>> Are you proposing that all of the inputs to stateful operators would
have to be sorted?
>>
> Records in stream don't need to be sorted, but it should be managed by
`Timestamp Barrier`, which means
> 1. Records belonging to a specific `Timestamp Barrier` are disordered.
> 2. Computations in different timestamp barriers are ordered. For the above
> example, each stateful subtask can start computation for T2 only after it
> finishes computation for T1. Subtasks are independent of each other.

Wouldn't that add significant latency to processing the records? You would
basically introduce a batch processing concept in Flink?

Have you considered some alternative solutions? Like for example letting
each operator/function/sink to take care of the data disorder? For example:
- stateless operators, could completely ignore the issue and process the
records normally, as they are doing right now
- stateful operators, should either:
- if the business doesn't require ordering, they could process the
records immediately
- or buffer the records internally, like currently windowed/temporal
operators are doing. Non windowed joins/aggregations could also work in a
similar manner, like pre-aggregate data per each "epoch" (as demarcated by
timestamp barriers).
- sinks implementation would have to match what external system support:
- if the external system requires ordered writes (something like Kafka
topic?), the sinks would have to buffer the writes until a "timestamp
barrier" arrives
- some sinks might support writing the data simultaneously to different
"epochs". For example writing files bucketed by each epoch. Each
bucket/epoch could be committed independently

This way, latency would be behaving very much like it currently does in
Flink. For example if we have a following streaming SQL:

INSERT INTO alerts_with_user SELECT * FROM alerts a, users u WHERE
a.user_id = u.id

If there is some lag in the users table, alerts would be still generated.
Downstream applications could process and react to newly generated
`alerts_with_user`, while at the same time, we could have a consistent view
across those three tables (users, alerts, alerts_with_user) if needed.

> I call the data of the timetamp barrier "committed" if the data
> is written to a table according to the barrier without a snapshot, and the
> data may be "rolled back" due to job failure. (sorry that the "committed"
> here may not be appropriate)

Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't
say that the timestamp barrier has been committed, but that all records for
given "epoch" have been processed/written, but not yet committed, so they
can still be rolled-back?

> For example, when multiple jobs start at the same time and register
themselves in `MetaService`,
> it needs to serially check whether they write to the same table

Why do we need to do that? Only to disallow this? To forbid writing from
two jobs into a single table? If so, can we not push this responsibility
down to the connector? Like sink/source operator coordinators should
negotiate with respective external systems if the given read/write is
allowed? So if there is a need for such meta service, Flink doesn't need to
know about it?

Best,
Piotrek

pon., 6 lut 2023 o 10:44 Shammon FY  napisał(a):

> Hi Piotr,
>
> Thanks for your feedback. In general, I think `Timesamp Barrier` is a
> special `Watermark` that all sources send watermarks with the same
> timestamp as `Timestamp Barrier` and aggregation operators will align data
> by it. For example, all source subtasks are assigned two unified watermarks
> T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by T1,
> and records with timestamp (T1, T2] will be aligned by T2.
>
> > Are you proposing that all of the inputs to stateful operators would have
> to be sorted?
>
> Records in stream don't need to be sorted, but it should be managed by
> `Timestamp Barrier`, which means
> 1. Records belonging to a specific `Timestamp Barrier` are disordered.
> 2. Computations in different timestamp barriers are ordered. For the above
> example, each stateful subtask can start computation for T2 only after it
> finishes computation for T1. Subtasks are independent of each other.
>
> > Can you explain why do you need those 3 states? Why can committed records
> be rolled back?
>
> Here I try to define the states of data in tables according to Timestamp
> Barrier and Snapshot, and I found that the 3 states are incomplete. For
> example, there is timestamp barrier T associated with checkpoint P, and
> sink operator will create snapshot S for P in tables. The data states in
> tables are as follows
> 1. Sink finishes writing data of timestamp barrier T to a table, but
> snapshot P is not created in the table and T is not finished in all tables.
> 2. Sink finishes writing data of timestamp barrier T to a table, creates
> snapshot P according to checkpoint C, but the T1 is not 

Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Jing Ge
Hi,

We have the same goal to let users migrate to SinkV2. Just marking those
interfaces as deprecated or public won't achieve this goal. Frankly
speaking, users will start to migrate not because those interfaces are
marked as deprecated. They will do it because there is a workable
implementation which can replace the old one with a clear migration
guideline. What Dong Li and Jark Wu said makes a lot of sense. This leads
to the "bottom-up" solution - first have workable(graduated, not
necessarily means @public, please see below) connectors, then deprecate and
graduate top level interfaces.

Hi Konstanin

Thanks for sharing. Commonly, you are right, I'm with you. Interface and
its (solo) implementation should be graduated together. But SinkV2 is a
special case. It has more than 10 implementations. And it is a more
customer-facing API which has a big impact on the ecosystem. We should do
it with special care. If we go this way, it will be even harder to graduate
the SinkV2 API, because many connector implementations should be graduated
with SinkV2 interfaces together.

Speaking of the issue you mentioned, as far as I am concerned, the current
KafkaSink implementation[1] offers an option for this. Conceptually, it is
possible to mark KafkaSink as @public without marking SinkV2 as @public.
All potential unstable methods coming from SinkV2 interfaces will be kept
marked as @internal. We still can make breaking changes with those methods,
i.e. with SinkV2 interfaces(still @publicEvolving) after KafkaSink is
graduated. For KafkaSink itself, it is also possible to do further backward
compatible changes, like adding new methods, removing @internal(once the
related SinkV2 interface is graduated), etc. The graduation impact will be
limited only within the Kafka connector.

Another tiny different option could be that we make KafkaSink be able to
completely replace FlinkKafkaProducer, still keep it as @publicEvolving and
remove FlinkKafkaProducer. This could also be considered as the (soft)
graduation of KafkaSink even if it is still @publicEvolving. Users will not
be confused because there is only one Kafka sink
solution(FlinkKafkaProducer is gone).

It's my belief that this is a more flexible solution with small impact on
the ecosystem than directly marking top level interfaces as @deprecated or
@public without showing users how to migrate, i.e. without concrete
graduated implementations.

Best regards,
Jing



[1]
https://github.com/apache/flink/blob/2ae5df278958073fee63b2bf824a53a28a21701b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java#L89


On Mon, Feb 6, 2023 at 2:56 PM Galen Warren  wrote:

> Recently, a critical bug with the Unified Sink committer was reported:  
> [FLINK-30238]
> Unified Sink committer does not clean up state on final savepoint - ASF
> JIRA (apache.org) .
>
> Fabian Paul reported:
>
> Hi folks,
>>
>> I did some initial investigation, and the problem seems twofold.
>>
>> If no post-commit topology is used, we do not run into a problem where
>> we could lose data but since we do not clean up the state correctly,
>> we will hit this [1] when trying to stop the pipeline with a savepoint
>> after we have started it from a savepoint.
>> AFAICT all two-phase commit sinks are affected Kafka, File etc.
>>
>> For sinks using the post-commit topology, the same applies.
>> Additionally, we might never do the commit from the post-commit
>> topology resulting in lost data.
>>
>> Best,
>> Fabian
>>
>
>
> Does this need to be addressed before people can safely move to V2 sinks?
> I'm using the StreamingFileSink for this reason.
>
> Thanks,
>
> Galen
>
>
>
>
>
> On Mon, Feb 6, 2023 at 8:48 AM Konstantin Knauf  wrote:
>
>> Hi Steven,
>>
>> Sink is already deprecated. It was deprecated at the moment where we
>> introduced SinkV2.
>>
>> Hi Jark, Hi Dong,
>>
>> My understanding is the SinkV2 is a workable interface. The most important
>> connectors have been migrated (Kafka, Filesystem) and more connectors
>> (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
>> public, it does not need to have all possible functionality. Public APIs
>> can be extended. That's what we do all the time. There will also always be
>> bugs. So, these points can not be categorical blockers to promote the API.
>>
>> What are the specific issues/tickets that are blocking us? Can we in your
>> opinion only deprecate it when every single connector in Apache Flink is
>> migrated already?
>>
>> In my opinion it is the time to ask users to the migrate their connectors.
>> More importantly, @Deprecated would signal users not to build new
>> connectors on SinkFunction. I would arque its also very misleading to
>> users
>> to not @Deprecated SinkFunction given that is clearly will be deprecated.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
>>
>> > I agree with Dong Lin.

[NOTICE] Website now has a staging environment

2023-02-06 Thread Chesnay Schepler

Hello,

Just so more people are aware of it, I recently enabled a staging 
environment for the Flink Website.


You can push the rebuilt website to the asf-staging branch in flink-web, 
and the changes will be visible at https://flink.staged.apache.org shortly.


This has been documented at 
https://cwiki.apache.org/confluence/display/FLINK/Website.



Currently a Hugo-based version of the Website (FLINK-22922) can be seen 
in the staging environment.



Cheers



Re: Reworking the Rescale API

2023-02-06 Thread Maximilian Michels
>> I fully agree that in-place scaling is a much harder problem which is out of 
>> the scope for now. My primary concern here is to be able to rescale with 
>> upfront reservation of resources before restarting the job, so the job 
>> doesn't get stuck in case of resource constraints.
> Not sure I follow. The AS only rescales when it has already acquired the 
> slots that it needs.

I'm saying that the primary objective of this thread is to figure out
upfront reservation of resources as part of a new Rescale API. The
adaptive scheduler is a (very reasonable) means to an end to fulfil
this property. If we were to go with another solution because the
adaptive scheduler does not prove to be production ready, then we
would still have to make that property holds. I'm going to experiment
a bit with the adaptive scheduler to see if there are any other
limitations.

As for the slot sharing groups with different maximum parallelism, I
see what the issue is here:
https://github.com/apache/flink/blob/2ae5df278958073fee63b2bf824a53a28a21701b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java#L97
Should be fixable. I've filed a JIRA here:
https://issues.apache.org/jira/browse/FLINK-30931

-Max

On Fri, Feb 3, 2023 at 10:13 AM Chesnay Schepler  wrote:
>
> > My primary concern here is to be able to rescale with upfront reservation 
> > of resources before restarting the job, so the job doesn't get stuck in 
> > case of resource constraints.
>
> Not sure I follow. The AS only rescales when it has already acquired the 
> slots that it needs.
>
>  > This is a blocker from my side. Why do we have that restriction?
>
> We just didn't bother fixing it initially. It should be easy to fix.
>
> On 02/02/2023 18:29, Maximilian Michels wrote:
> > I fully agree that in-place scaling is a much harder problem which is
> > out of the scope for now. My primary concern here is to be able to
> > rescale with upfront reservation of resources before restarting the
> > job, so the job doesn't get stuck in case of resource constraints.
> >
> >> Unused slots: If the max parallelism for slot sharing groups is not equal, 
> >> slots offered to Adaptive Scheduler might be unused.
> > From: 
> > https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/#limitations-1
> >
> > This is a blocker from my side. Why do we have that restriction?
> >
> > On Thu, Feb 2, 2023 at 5:03 PM Chesnay Schepler  wrote:
> >>   > If I understand correctly, the adaptive scheduler currently does a
> >> full job restart. Is there any work planned to enable in-place rescaling
> >> with the adaptive scheduler?
> >>
> >> Nothing concrete. Sure, it's on a wishlist, but it'd require significant
> >> changes to how the runtime works.
> >> Rescaling stateful operators requires keygroups to be redistributed,
> >> you'd need to be able to change task edges dynamically, roll-back to a
> >> checkpoint without restarting tasks, ...
> >>
> >> It's less of a scheduler thing actually.
> >>
> >> An earlier step to that would be to allow recovery from an error without
> >> restarting all tasks, which would benefit all schedulers.
> >> But again bit of a moonshot.
> >>
> >>   > How well has the adaptive scheduler been tested in production? If we
> >> are intending to use it for rescale operations, I'm a bit concerned
> >> those jobs might show different behavior due to the scheduling than jobs
> >> started with the default scheduler.
> >>
> >> I don't think we got a lot of feedback so far.
> >> Outside of the limitations listed on the elastic scaling page (which I
> >> believe we'll address in due time) I'm not aware of any problems.
> >> We haven't run into any issues internally.
> >>
> >> On 02/02/2023 12:44, Maximilian Michels wrote:
> >>> +1 on improving the scheduler docs.
> >>>
>  They never shared a base class since day 1. Are you maybe mixing up the 
>  AdaptiveScheduler and AdaptiveBatchScheduler?
> >>> @Chesnay: Indeed, I had mixed this up. DefaultScheduler and
> >>> AdaptiveScheduler only share the SchedulerNG interface while the
> >>> DefaultScheduler and the AdaptiveBatchScheduler share a subset of the
> >>> code. Too many schedulers :)
> >>>
> >>> Thanks for clarifying the current and the intended feature set of the
> >>> adaptive scheduler!
> >>>
> >>> How well has the adaptive scheduler been tested in production? If we
> >>> are intending to use it for rescale operations, I'm a bit concerned
> >>> those jobs might show different behavior due to the scheduling than
> >>> jobs started with the default scheduler.
> >>>
> >>> If I understand correctly, the adaptive scheduler currently does a
> >>> full job restart. Is there any work planned to enable in-place
> >>> rescaling with the adaptive scheduler?
> >>>
>  @max:
>  - when user repartition, we still need to restart the job, can we 
>  try to
>  do this part of the work internally instead of 

[jira] [Created] (FLINK-30931) Adaptive scheduler wastes slots for multiple slot sharing groups with different max parallelism

2023-02-06 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-30931:
--

 Summary: Adaptive scheduler wastes slots for multiple slot sharing 
groups with different max parallelism
 Key: FLINK-30931
 URL: https://issues.apache.org/jira/browse/FLINK-30931
 Project: Flink
  Issue Type: Bug
Reporter: Maximilian Michels
 Fix For: 1.17.0


The adaptive scheduler assumes all slot sharing groups have the same maximum 
parallelism which can yield to allocating too many slots for smaller slot 
sharing groups.

See 
https://github.com/apache/flink/blob/2ae5df278958073fee63b2bf824a53a28a21701b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java#L97



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-06 Thread Timo Walther

Hi Feng,

this is indeed a good proposal.

1) It makes sense to improve the catalog listing for platform providers.

2) Other feedback from the past has shown that users would like to avoid 
the default in-memory catalog and offer their catalog before a 
TableEnvironment session starts.


3) Also we might reconsider whether a default catalog and default 
database make sense. Or whether this can be disabled and SHOW CATALOGS 
can be used for listing first without having a default catalog.


What do you think about option 2 and 3?

In any case, I would propose we pass a CatalogProvider to 
EnvironmentSettings and only allow a single instance. Catalogs should 
never shadow other catalogs.


We could also use the org.apache.flink.table.factories.Factory infra and 
allow catalog providers via pure string properties. Not sure if we need 
this in the first version though.


Cheers,
Timo


On 06.02.23 11:21, Feng Jin wrote:

Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

 default void initialize(ClassLoader classLoader, ReadableConfig config) {}

 Optional getCatalog(String catalogName);

 Set listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
 private @Nullable CatalogProvider catalogProvider;

 private Map catalogs;

 public void setCatalogProvider(CatalogProvider catalogProvider) {
 this.catalogProvider = catalogProvider;
 }

 public Optional getCatalog(String catalogName) {
 // If there is no corresponding catalog in catalogs,
 // get catalog by catalogProvider
 if (catalogProvider != null) {
 Optional catalog = 
catalogProvider.getCatalog(catalogName);
 }
 }

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin





Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Galen Warren
Recently, a critical bug with the Unified Sink committer was reported:
 [FLINK-30238]
Unified Sink committer does not clean up state on final savepoint - ASF
JIRA (apache.org) .

Fabian Paul reported:

Hi folks,
>
> I did some initial investigation, and the problem seems twofold.
>
> If no post-commit topology is used, we do not run into a problem where
> we could lose data but since we do not clean up the state correctly,
> we will hit this [1] when trying to stop the pipeline with a savepoint
> after we have started it from a savepoint.
> AFAICT all two-phase commit sinks are affected Kafka, File etc.
>
> For sinks using the post-commit topology, the same applies.
> Additionally, we might never do the commit from the post-commit
> topology resulting in lost data.
>
> Best,
> Fabian
>


Does this need to be addressed before people can safely move to V2 sinks?
I'm using the StreamingFileSink for this reason.

Thanks,

Galen





On Mon, Feb 6, 2023 at 8:48 AM Konstantin Knauf  wrote:

> Hi Steven,
>
> Sink is already deprecated. It was deprecated at the moment where we
> introduced SinkV2.
>
> Hi Jark, Hi Dong,
>
> My understanding is the SinkV2 is a workable interface. The most important
> connectors have been migrated (Kafka, Filesystem) and more connectors
> (OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
> public, it does not need to have all possible functionality. Public APIs
> can be extended. That's what we do all the time. There will also always be
> bugs. So, these points can not be categorical blockers to promote the API.
>
> What are the specific issues/tickets that are blocking us? Can we in your
> opinion only deprecate it when every single connector in Apache Flink is
> migrated already?
>
> In my opinion it is the time to ask users to the migrate their connectors.
> More importantly, @Deprecated would signal users not to build new
> connectors on SinkFunction. I would arque its also very misleading to users
> to not @Deprecated SinkFunction given that is clearly will be deprecated.
>
> Cheers,
>
> Konstantin
>
>
> Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :
>
> > I agree with Dong Lin.
> >
> > Oracle explains how to use Deprecate API [1]:
> >
> > You are strongly recommended to use the Javadoc @deprecated tag with
> > > appropriate comments explaining how to use the new API. This ensures
> > > developers will *have a workable migration path from the old API to the
> > > new API*.
> >
> >
> > From a user's perspective, the workable migration path is very important.
> > Otherwise, it blurs the semantics of API deprecation. The Flink API's
> > compatibility and stability issues in the past left a bad impression on
> the
> > downstream projects. We should be careful when changing and deprecating
> > APIs, especially when there are known migration gaps. I think it's a good
> > idea to migrate Flink-owned connectors before marking old API deprecated.
> > This ensures downstream projects can migrate to new APIs smoothly.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html
> >
> > On Mon, 6 Feb 2023 at 10:01, Steven Wu  wrote:
> >
> > > Regarding the discussion on global committer [1] for sinks with global
> > > transactions, there is no consensus on solving that problem in SinkV2.
> > Will
> > > it require any breaking change in SinkV2?
> > >
> > > Also will SinkV1 be deprecated too? or it should happen sometime after
> > > SinkFunction deprecation?
> > >
> > > [1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
> > >
> > > On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:
> > >
> > > > Hi Konstantin,
> > > >
> > > > Thanks for the comment! Please see my comment inline.
> > > >
> > > > Cheers,
> > > > Dong
> > > >
> > > > On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf 
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > sorry for joining the discussion late.
> > > > >
> > > > > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> > > > leaving
> > > > > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2
> > > @Public
> > > > in
> > > > > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended
> > for
> > > > > public use. So, I don't see it as a blocker for deprecating
> > > SinkFunction
> > > > > that we have to make SinkV2 @PublicEvovling. For reference this is
> > the
> > > > > description of @PublicEvovling:
> > > > >
> > > > > /**
> > > > >  * Annotation to mark classes and methods for public use, but with
> > > > > evolving interfaces.
> > > > >  *
> > > > >  * Classes and methods with this annotation are intended for
> > public
> > > > > use and have stable behavior.
> > > > >  * However, their interfaces and signatures are not considered to
> be
> > > > > stable and might be changed
> > > > >  * across versions.
> > > > >  *
> > > > >  * This 

Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Konstantin Knauf
Hi Steven,

Sink is already deprecated. It was deprecated at the moment where we
introduced SinkV2.

Hi Jark, Hi Dong,

My understanding is the SinkV2 is a workable interface. The most important
connectors have been migrated (Kafka, Filesystem) and more connectors
(OpenSearch, ElasticSearch, Kinesis) use it successfully. To make SinkV2
public, it does not need to have all possible functionality. Public APIs
can be extended. That's what we do all the time. There will also always be
bugs. So, these points can not be categorical blockers to promote the API.

What are the specific issues/tickets that are blocking us? Can we in your
opinion only deprecate it when every single connector in Apache Flink is
migrated already?

In my opinion it is the time to ask users to the migrate their connectors.
More importantly, @Deprecated would signal users not to build new
connectors on SinkFunction. I would arque its also very misleading to users
to not @Deprecated SinkFunction given that is clearly will be deprecated.

Cheers,

Konstantin


Am Mo., 6. Feb. 2023 um 13:26 Uhr schrieb Jark Wu :

> I agree with Dong Lin.
>
> Oracle explains how to use Deprecate API [1]:
>
> You are strongly recommended to use the Javadoc @deprecated tag with
> > appropriate comments explaining how to use the new API. This ensures
> > developers will *have a workable migration path from the old API to the
> > new API*.
>
>
> From a user's perspective, the workable migration path is very important.
> Otherwise, it blurs the semantics of API deprecation. The Flink API's
> compatibility and stability issues in the past left a bad impression on the
> downstream projects. We should be careful when changing and deprecating
> APIs, especially when there are known migration gaps. I think it's a good
> idea to migrate Flink-owned connectors before marking old API deprecated.
> This ensures downstream projects can migrate to new APIs smoothly.
>
> Best,
> Jark
>
> [1]:
>
> https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html
>
> On Mon, 6 Feb 2023 at 10:01, Steven Wu  wrote:
>
> > Regarding the discussion on global committer [1] for sinks with global
> > transactions, there is no consensus on solving that problem in SinkV2.
> Will
> > it require any breaking change in SinkV2?
> >
> > Also will SinkV1 be deprecated too? or it should happen sometime after
> > SinkFunction deprecation?
> >
> > [1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
> >
> > On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:
> >
> > > Hi Konstantin,
> > >
> > > Thanks for the comment! Please see my comment inline.
> > >
> > > Cheers,
> > > Dong
> > >
> > > On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > sorry for joining the discussion late.
> > > >
> > > > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> > > leaving
> > > > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2
> > @Public
> > > in
> > > > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended
> for
> > > > public use. So, I don't see it as a blocker for deprecating
> > SinkFunction
> > > > that we have to make SinkV2 @PublicEvovling. For reference this is
> the
> > > > description of @PublicEvovling:
> > > >
> > > > /**
> > > >  * Annotation to mark classes and methods for public use, but with
> > > > evolving interfaces.
> > > >  *
> > > >  * Classes and methods with this annotation are intended for
> public
> > > > use and have stable behavior.
> > > >  * However, their interfaces and signatures are not considered to be
> > > > stable and might be changed
> > > >  * across versions.
> > > >  *
> > > >  * This annotation also excludes methods and classes with evolving
> > > > interfaces / signatures within
> > > >  * classes annotated with {@link Public}.
> > > >  */
> > > >
> > > >
> > > > Marking SinkFunction @Deprecated would already single everyone to
> move
> > to
> > > > SinkV2, which we as a community, I believe, have a strong interest
> in.
> > > Its
> > > >
> > >
> > > Yes, I also believe we all have this strong interest. I just hope that
> > this
> > > can be done in the best possible way that does not confuse users.
> > >
> > > I probably still have the same concern regarding its impact on users:
> if
> > we
> > > mark an API as deprecated, it effectively means the users of this API
> > > should start to migrate to another API (e.g. SinkV2) and we might
> remove
> > > this API in the future. However, given that we know there are known
> > > problems preventing users from doing so, it seems that we are not ready
> > to
> > > send this message to users right.
> > >
> > > If I understand correctly, I guess you are suggesting that by marking
> > > SinkFunction as deprecated, we can put higher pressure on Flink
> > > contributors to update the existing Flink codebase to improve and use
> > > SinkV2.
> > >
> > > I am not sure this is the right way to 

Re: [ANNOUNCE] Apache Flink 1.16.1 released

2023-02-06 Thread Etienne Chauchot

Hi,

Thanks to everyone involved.

Best

Etienne

Le 02/02/2023 à 03:55, weijie guo a écrit :

Thank Martin for managing the release and all the people involved.


Best regards,

Weijie


Konstantin Knauf  于2023年2月2日周四 06:40写道:


Great. Thanks, Martijn for managing the release.

Am Mi., 1. Feb. 2023 um 20:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:


The Apache Flink community is very happy to announce the release of

Apache

Flink 1.16.1, which is the first bugfix release for the Apache Flink 1.16
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data

streaming

applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the

improvements

for this bugfix release:
https://flink.apache.org/news/2023/01/30/release-1.16.1.html

The full release notes are available in Jira:



https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352344

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Feel free to reach out to the release managers (or respond to this

thread)

with feedback on the release process. Our goal is to constantly improve

the

release process. Feedback on what could be improved or things that didn't
go so well are appreciated.

Best regards,

Martijn Visser



--
https://twitter.com/snntrable
https://github.com/knaufk



[jira] [Created] (FLINK-30930) Automatically determine Flink binary download URL from version

2023-02-06 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-30930:


 Summary: Automatically determine Flink binary download URL from 
version
 Key: FLINK-30930
 URL: https://issues.apache.org/jira/browse/FLINK-30930
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System / CI
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30929) Add helpful message when ElasticsearchSink.Builder.build() throws a IllegalArgumentException.

2023-02-06 Thread Kenny Wu (Jira)
Kenny Wu created FLINK-30929:


 Summary: Add helpful message when 
ElasticsearchSink.Builder.build() throws a IllegalArgumentException.
 Key: FLINK-30929
 URL: https://issues.apache.org/jira/browse/FLINK-30929
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.13.6
Reporter: Kenny Wu
 Fix For: 1.13.6


Recently, I'm trying to use flink-connector-elasticsearch on my project, and I 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30928) Use temurin JRE in test hadoop docker image

2023-02-06 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-30928:
-

 Summary: Use temurin JRE in test hadoop docker image
 Key: FLINK-30928
 URL: https://issues.apache.org/jira/browse/FLINK-30928
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Affects Versions: 1.17.0
Reporter: Gabor Somogyi


OpenJDK is deprecated, please see: https://hub.docker.com/_/openjdk



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30927) Several tests fail with two non-abstract methods have the same parameter types, declaring type and return type

2023-02-06 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-30927:
---

 Summary: Several tests fail with two non-abstract methods  have 
the same parameter types, declaring type and return type
 Key: FLINK-30927
 URL: https://issues.apache.org/jira/browse/FLINK-30927
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin


e.g. 
org.apache.flink.table.planner.runtime.stream.sql.MatchRecognizeITCase#testUserDefinedFunctions

 

it seems during code splitter it starts generating some methods with same 
signature

 

{noformat}

org.codehaus.janino.InternalCompilerException: Compiling 
"MatchRecognizePatternProcessFunction$77": Two non-abstract methods "default 
void MatchRecognizePatternProcessFunction$77.processMatch_0(java.util.Map, 
org.apache.flink.cep.functions.PatternProcessFunction$Context, 
org.apache.flink.util.Collector) throws java.lang.Exception" have the same 
parameter types, declaring type and return type

{noformat}

 

Probably could be a side effect of 
https://issues.apache.org/jira/browse/FLINK-27246



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-06 Thread Jark Wu
I agree with Dong Lin.

Oracle explains how to use Deprecate API [1]:

You are strongly recommended to use the Javadoc @deprecated tag with
> appropriate comments explaining how to use the new API. This ensures
> developers will *have a workable migration path from the old API to the
> new API*.


>From a user's perspective, the workable migration path is very important.
Otherwise, it blurs the semantics of API deprecation. The Flink API's
compatibility and stability issues in the past left a bad impression on the
downstream projects. We should be careful when changing and deprecating
APIs, especially when there are known migration gaps. I think it's a good
idea to migrate Flink-owned connectors before marking old API deprecated.
This ensures downstream projects can migrate to new APIs smoothly.

Best,
Jark

[1]:
https://docs.oracle.com/javase/8/docs/technotes/guides/javadoc/deprecation/deprecation.html

On Mon, 6 Feb 2023 at 10:01, Steven Wu  wrote:

> Regarding the discussion on global committer [1] for sinks with global
> transactions, there is no consensus on solving that problem in SinkV2. Will
> it require any breaking change in SinkV2?
>
> Also will SinkV1 be deprecated too? or it should happen sometime after
> SinkFunction deprecation?
>
> [1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
>
> On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:
>
> > Hi Konstantin,
> >
> > Thanks for the comment! Please see my comment inline.
> >
> > Cheers,
> > Dong
> >
> > On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf 
> wrote:
> >
> > > Hi everyone,
> > >
> > > sorry for joining the discussion late.
> > >
> > > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> > leaving
> > > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2
> @Public
> > in
> > > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended for
> > > public use. So, I don't see it as a blocker for deprecating
> SinkFunction
> > > that we have to make SinkV2 @PublicEvovling. For reference this is the
> > > description of @PublicEvovling:
> > >
> > > /**
> > >  * Annotation to mark classes and methods for public use, but with
> > > evolving interfaces.
> > >  *
> > >  * Classes and methods with this annotation are intended for public
> > > use and have stable behavior.
> > >  * However, their interfaces and signatures are not considered to be
> > > stable and might be changed
> > >  * across versions.
> > >  *
> > >  * This annotation also excludes methods and classes with evolving
> > > interfaces / signatures within
> > >  * classes annotated with {@link Public}.
> > >  */
> > >
> > >
> > > Marking SinkFunction @Deprecated would already single everyone to move
> to
> > > SinkV2, which we as a community, I believe, have a strong interest in.
> > Its
> > >
> >
> > Yes, I also believe we all have this strong interest. I just hope that
> this
> > can be done in the best possible way that does not confuse users.
> >
> > I probably still have the same concern regarding its impact on users: if
> we
> > mark an API as deprecated, it effectively means the users of this API
> > should start to migrate to another API (e.g. SinkV2) and we might remove
> > this API in the future. However, given that we know there are known
> > problems preventing users from doing so, it seems that we are not ready
> to
> > send this message to users right.
> >
> > If I understand correctly, I guess you are suggesting that by marking
> > SinkFunction as deprecated, we can put higher pressure on Flink
> > contributors to update the existing Flink codebase to improve and use
> > SinkV2.
> >
> > I am not sure this is the right way to use @deprecated, which has a
> > particular meaning for its users rather than contributors. And I am also
> > not sure we can even pressure contributors of an open-source project into
> > developing a feature (e.g. migrate all existing SinkFunction subclasses
> to
> > SinkV2). IMO, the typical way is for the contributor with interest/time
> to
> > work on the feature, or talk to other contributors whether they are
> willing
> > to collaborate/work on this, rather than pressuring other contributors
> into
> > working on this.
> >
> >
> > almost comical how long the transition from SourceFurnction/SinkFunction
> to
> > > Source/Sink takes us. At the same time, we leave ourselves the option
> to
> > to
> > > make small changes to SinkV2 if any problems arise during the migration
> > of
> > > these connector.
> > >
> > > I think, we have a bit of a chicken/egg problem here. The pressure for
> > >
> >
> > Similar to the reason described above, I am not sure we have a
> chicken/egg
> > problem here. The issue here is that SinkV2 is not ready and we have a
> lot
> > of existing SinkFunction that is not migrated by ourselves. We (Flink
> > contributors) probably do not need to mark SinkFunction as deprecated in
> > order to address these issues in our own codebase.
> >
> >
> > users and contributors 

[jira] [Created] (FLINK-30926) [Umbrella] Test Flink Release 1.17

2023-02-06 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-30926:
-

 Summary: [Umbrella] Test Flink Release 1.17
 Key: FLINK-30926
 URL: https://issues.apache.org/jira/browse/FLINK-30926
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
 Fix For: 1.17.0


This is an umbrella ticket for the Flink 1.17 testing efforts. Please prepare 
for the release testing by creating child testing tasks for the new features.

Tickets for testing tasks should be opened with:
 * Priority: Blocker
 * FixVersion: 1.17.0
 * Label: release-testing

At the meantime, please update column value of `X-team verified` in the
[1.17 Release Wiki 
page|https://cwiki.apache.org/confluence/display/FLINK/1.17+Release].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30925) Add docs for the SQL Client gateway mode

2023-02-06 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-30925:
-

 Summary: Add docs for the SQL Client gateway mode
 Key: FLINK-30925
 URL: https://issues.apache.org/jira/browse/FLINK-30925
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / Client
Affects Versions: 1.17.0
Reporter: Shengkai Fang
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Flink Kubernetes Operator 1.4.0 release planning

2023-02-06 Thread Gyula Fóra
Hi Devs!

Based on the previously agreed upon release schedule (
https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning)
it is almost time for the 1.4.0 release.

There are still a number of smaller but important PRs open for some
critical fixes. I would like to merge those in the next 1-2 days and I
suggest we make the release cut on Wednesday/Thursday.

After that we should spend some time testing the release candidate and
hopefully we can finalize the release next week!

I volunteer as the release manager.

Cheers,
Gyula


[jira] [Created] (FLINK-30924) Conversion issues between timestamp and bingint

2023-02-06 Thread Feng Jin (Jira)
Feng Jin created FLINK-30924:


 Summary: Conversion issues between timestamp and bingint
 Key: FLINK-30924
 URL: https://issues.apache.org/jira/browse/FLINK-30924
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.16.1
Reporter: Feng Jin


When casting to convert timestamp and bigint, the following exception is 
thrown: 
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: The cast from NUMERIC type to 
TIMESTAMP type is not allowed. It's recommended to use 
TO_TIMESTAMP(FROM_UNIXTIME(numeric_col)) instead, note the numeric is in 
seconds.

{code}
However, the FROM_UNIXTIME function will use the local time zone for 
conversion, but the TO_TIMESTAMP function will not use the local time zone but 
will use the UTC time zone conversion, so that the actual result  in the  wrong 
result.

 

The following is an example of the results of the test
{code:java}
//代码占位符

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> select TO_TIMESTAMP(FROM_UNIXTIME(0));

// result 
                 EXPR$0
 1970-01-01 08:00:00.000

{code}
  

 

UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) has the same problem. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30923) Provide single script for installing Hugo

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30923:
-

 Summary: Provide single script for installing Hugo
 Key: FLINK-30923
 URL: https://issues.apache.org/jira/browse/FLINK-30923
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.17.0
Reporter: Matthias Pohl


Currently, we have multiple locations to install hugo. In the past this caused 
using different Hugo version for building the docs depending which script 
triggered the docs build.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30922) SQL validate fail to parse writing metadata

2023-02-06 Thread tanjialiang (Jira)
tanjialiang created FLINK-30922:
---

 Summary: SQL validate fail to parse writing metadata
 Key: FLINK-30922
 URL: https://issues.apache.org/jira/browse/FLINK-30922
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: tanjialiang


When i tried an simple demo sql with writing metadata to the kafka in flink sql 
client
{code:java}
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
 

it will be throw an error
{code:java}
org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
Source) ~[?:?]
        at 
org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client-1.16.1.jar:1.16.1]
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. From line 1, column 33 to line 1, column 34: Unknown target column 'ts'
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
 ~[?:?]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
 ~[?:?]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
~[?:?]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        ... 13 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 33 to line 1, column 34: Unknown target column 'ts'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_41]
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_41]
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_41]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
~[?:1.8.0_41]
        at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
~[?:?]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) 
~[?:?]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) 
~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172)
 ~[?:?]
    

RE: Re: Confluent Avro and Debezium formats - default schema name can be incompatible with registered schema name

2023-02-06 Thread Fruzsina Nagy
Hi Dawid,
Thanks for the suggestion, it’s worth a try. I’ll have a look at it.
I assume this ’schema’ option would not be required and the current logic with 
the default name ‘record’ would be used, if the schema is not provided 
explicitly.
Best regards,
Fruzsina

On 2023/01/27 13:14:39 Dawid Wysakowicz wrote:
> Hi Fruzsina,
> 
> I think this is a valid issue we should try to solve. A different 
> approach I am thinking about is that we could actually add an option to 
> provide an entire avro schema to use. Something like: 
> `avro-confluent.schema` which we would validate it maps properly to the 
> schema of the table (that is names of fields and their types match) and 
> use it instead of the generated one.
> 
> What do you think about that approach?
> 
> Best,
> 
> Dawid
> 
> On 26/01/2023 11:29, Fruzsina Nagy wrote:
> > Hi everyone,
> >
> > I have come across the below issue, while experimenting with the Confluent 
> > registry and avro-confluent, debezium-avro-confluent formats. Please let me 
> > know your thoughts on it. Should this issue be addressed?
> >
> > Thanks in advance,
> > Fruzsina
> > The use case
> >
> > Create a new topic on Confluent Cloud
> > Create a value schema with the name “sampleRecord”:
> > {
> >"type": "record",
> >"namespace": "com.mycorp.mynamespace",
> >"name": "sampleRecord",
> > …}
> > Create table with “avro-confluent” format:
> > CREATE TABLE `newtesttopic` (
> >   `my_field1` INT NOT NULL,
> >   `my_field2` DOUBLE NOT NULL,
> >   `my_field3` VARCHAR(2147483647) NOT NULL,
> >   ") WITH (
> >   'connector' = 'kafka',
> >   'topic' = 'newtesttopic',
> >   'scan.startup.mode' = 'latest-offset',
> >   'properties.bootstrap.servers' = 'bootstrapServers',
> >   'properties.sasl.jaas.config' = 'saslJaasConfig',
> >   'properties.sasl.mechanism' = 'PLAIN',
> >   'properties.security.protocol' = 'SASL_SSL',
> >   'format' = 'avro-confluent',
> >   'avro-confluent.url' = 'confluentSchemaRegUrl',
> >   'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
> >   'avro-confluent.basic-auth.user-info' = 'user:pw')
> >
> > Insert data into the “newtesttopic”
> > The following error is thrown:
> > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> >  Schema being registered is incompatible with an earlier schema for subject 
> > "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, 
> > location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, 
> > reader:{"type":"record","name":"record",...}, 
> > writer:{"type":"record","name":"sampleRecord",...}
> > This error of course can be avoided if we don’t register a schema for our 
> > topic on the Confluent Cloud site before inserting data into the kafka 
> > table, and we just let Flink register it for us with the name “record”.
> >
> > The cause of the error
> >
> > I found that the error is caused by the 
> > EncodingFormat> created by 
> > RegistryAvroFormatFactory.createEncodingFormat, because when creating a 
> > AvroRowDataSerializationSchema, it uses 
> > AvroSchemaConverter.convertToSchema(LogicalType schema) 
> > 
> > which names the schema “record” 
> > 
> >  by default.
> >
> > But the registered schema is named “sampleRecord” in the above example, so 
> > the Confluent Schema Registry client doesn’t accept it.
> > The problem
> >
> > To resolve this I added a new option “schema-name” to “avro-confluent” and 
> > “debezium-avro-confluent” formats. And as I was testing the 
> > “debezium-avro-confluent” format, it turned out that this solution doesn’t 
> > solve the problem in those cases when there are named schemas (record, 
> > enum, fixed types) nested in the schema of the topic.
> >
> > For example:
> > In case of “debezium-avro-confluent” the schema created is a union of null 
> > and a Debezium specific record schema (before, after, op). If I use the 
> > above option to provide a specific name for the schema, I get an 
> > org.apache.avro.UnresolvedUnionException, because 
> > AvroRowDataSerializationSchema 
> > 
> >  converts the RowType to a record schema with the name “record”, which will 
> > not be found in the union, if the the Debezium specific record has a 
> > different name.
> > Union type is problematic because in the general case, if we define a union 
> > schema [schema1, 

[Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-06 Thread Feng Jin
Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

default void initialize(ClassLoader classLoader, ReadableConfig config) {}

Optional getCatalog(String catalogName);

Set listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
private @Nullable CatalogProvider catalogProvider;

private Map catalogs;

public void setCatalogProvider(CatalogProvider catalogProvider) {
this.catalogProvider = catalogProvider;
}

public Optional getCatalog(String catalogName) {
// If there is no corresponding catalog in catalogs,
// get catalog by catalogProvider
if (catalogProvider != null) {
Optional catalog = catalogProvider.getCatalog(catalogName);
}
}

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin


Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-06 Thread Shammon FY
Hi David

Thanks for initiating this discussion. I think declaring job resource
requirements by REST API is very valuable. I just left some comments as
followed

1) How about adding some more information such as vertex type
(SOURCE/MAP/JOIN and .etc) in the response of `get jobs
resource-requirements`? For users, only vertex-id may be difficult to
understand.

2) For sql jobs, we always use a unified parallelism for most vertices. Can
we provide them with a more convenient setting method instead of each one?


Best,
Shammon


On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl 
wrote:

> Thanks David for creating this FLIP. It sounds promising and useful to
> have. Here are some thoughts from my side (some of them might be rather a
> follow-up and not necessarily part of this FLIP):
> - I'm wondering whether it makes sense to add some kind of resource ID to
> the REST API. This would give Flink a tool to verify the PATCH request of
> the external system in a compare-and-set kind of manner. AFAIU, the process
> requires the external system to retrieve the resource requirements first
> (to retrieve the vertex IDs). A resource ID  would be sent along as a
> unique identifier for the provided setup. It's essentially the version ID
> of the currently deployed resource requirement configuration. Flink doesn't
> know whether the external system would use the provided information in some
> way to derive a new set of resource requirements for this job. The
> subsequent PATCH request with updated resource requirements would include
> the previously retrieved resource ID . The PATCH call would fail if
> there was a concurrent PATCH call in between indicating to the external
> system that the resource requirements were concurrently updated.
> - How often do we allow resource requirements to be changed? That question
> might make my previous comment on the resource ID obsolete because we could
> just make any PATCH call fail if there was a resource requirement update
> within a certain time frame before the request. But such a time period is
> something we might want to make configurable then, I guess.
> - Versioning the JobGraph in the JobGraphStore rather than overwriting it
> might be an idea. This would enable us to provide resource requirement
> changes in the UI or through the REST API. It is related to a problem
> around keeping track of the exception history within the AdaptiveScheduler
> and also having to consider multiple versions of a JobGraph. But for that
> one, we use the ExecutionGraphInfoStore right now.
> - Updating the JobGraph in the JobGraphStore makes sense. I'm just
> wondering whether we bundle two things together that are actually separate:
> The business logic and the execution configuration (the resource
> requirements). I'm aware that this is not a flaw of the current FLIP but
> rather something that was not necessary to address in the past because the
> JobGraph was kind of static. I don't remember whether that was already
> discussed while working on the AdaptiveScheduler for FLIP-160 [1]. Maybe,
> I'm missing some functionality here that requires us to have everything in
> one place. But it feels like updating the entire JobGraph which could be
> actually a "config change" is not reasonable. ...also considering the
> amount of data that can be stored in a ConfigMap/ZooKeeper node if
> versioning the resource requirement change as proposed in my previous item
> is an option for us.
> - Updating the JobGraphStore means adding more requests to the HA backend
> API. There were some concerns shared in the discussion thread [2] for
> FLIP-270 [3] on pressuring the k8s API server in the past with too many
> calls. Eventhough, it's more likely to be caused by checkpointing, I still
> wanted to bring it up. We're working on a standardized performance test to
> prepare going forward with FLIP-270 [3] right now.
>
> Best,
> Matthias
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> [2] https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>
> On Fri, Feb 3, 2023 at 10:31 AM ConradJam  wrote:
>
> > Hi David:
> >
> > Thank you for drive this flip, which helps less flink shutdown time
> >
> > for this flip, I would like to make a few idea on share
> >
> >
> >- when the number of "slots" is insufficient, can we can stop users
> >rescaling or throw something to tell user "less avaliable slots to
> > upgrade,
> >please checkout your alivalbe slots" ? Or we could have a request
> >switch(true/false) to allow this behavior
> >
> >
> >- when user upgrade job-vertx-parallelism . I want to have an
> interface
> >to query the current update parallel execution status, so that the
> user
> > or
> >program can understand the current status
> >- I want to have an interface to query the current update parallelism
> >execution 

[jira] [Created] (FLINK-30921) Too many CI failed due to "Could not connect to azure.archive.ubuntu.com"

2023-02-06 Thread Rui Fan (Jira)
Rui Fan created FLINK-30921:
---

 Summary: Too many CI failed due to "Could not connect to 
azure.archive.ubuntu.com"
 Key: FLINK-30921
 URL: https://issues.apache.org/jira/browse/FLINK-30921
 Project: Flink
  Issue Type: Bug
Reporter: Rui Fan
 Attachments: image-2023-02-06-17-59-20-019.png

!image-2023-02-06-17-59-20-019.png!

 

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45762=logs=bea52777-eaf8-5663-8482-18fbc3630e81=b2642e3a-5b86-574d-4c8a-f7e2842bfb14]

 

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30920) K8 cluster autoscaler | exclude operator id from scaler

2023-02-06 Thread Gaurav Miglani (Jira)
Gaurav Miglani created FLINK-30920:
--

 Summary: K8 cluster autoscaler | exclude operator id from scaler
 Key: FLINK-30920
 URL: https://issues.apache.org/jira/browse/FLINK-30920
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gaurav Miglani


Sometime in cases of sink operator ids, where logic is heavy group aggregation 
and scan mode is earliest, flink k8 operator tries to scale/downscale sink 
operator ids as well, there should be a way where user can give list of 
operator ids/vertices where cluster autoscaler doesn't perform any scaling 
action



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-02-06 Thread Shammon FY
Hi Piotr,

Thanks for your feedback. In general, I think `Timesamp Barrier` is a
special `Watermark` that all sources send watermarks with the same
timestamp as `Timestamp Barrier` and aggregation operators will align data
by it. For example, all source subtasks are assigned two unified watermarks
T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by T1,
and records with timestamp (T1, T2] will be aligned by T2.

> Are you proposing that all of the inputs to stateful operators would have
to be sorted?

Records in stream don't need to be sorted, but it should be managed by
`Timestamp Barrier`, which means
1. Records belonging to a specific `Timestamp Barrier` are disordered.
2. Computations in different timestamp barriers are ordered. For the above
example, each stateful subtask can start computation for T2 only after it
finishes computation for T1. Subtasks are independent of each other.

> Can you explain why do you need those 3 states? Why can committed records
be rolled back?

Here I try to define the states of data in tables according to Timestamp
Barrier and Snapshot, and I found that the 3 states are incomplete. For
example, there is timestamp barrier T associated with checkpoint P, and
sink operator will create snapshot S for P in tables. The data states in
tables are as follows
1. Sink finishes writing data of timestamp barrier T to a table, but
snapshot P is not created in the table and T is not finished in all tables.
2. Sink finishes writing data of timestamp barrier T to a table, creates
snapshot P according to checkpoint C, but the T1 is not finished in all
tables.
3. Timestamp barrier T is finished in all tables, but snapshot P is not
created in all tables.
4. Timestamp barrier T is finished in all tables, and snapshot P is created
in all tables too.

Currently users can only get data from snapshots in Table Store and other
storages such as Iceberg. Users can get different "versioned" data from
tables according to their data freshness and consistency requirements.
I think we should support getting data with a timestamp barrier even before
the sink operator finishes creating the snapshot in the future. In this
situation, I call the data of the timetamp barrier "committed" if the data
is written to a table according to the barrier without a snapshot, and the
data may be "rolled back" due to job failure. (sorry that the "committed"
here may not be appropriate)

> I'm not sure if I follow. Generally speaking, why do we need MetaService
at all? Why can we only support writes to and reads from TableStore, and
not any source/sink that implements some specific interface?

It's a good point. I added a `MetaService` node in FLIP mainly to perform
some atomic operations. For example, when multiple jobs start at the same
time and register themselves in `MetaService`, it needs to serially check
whether they write to the same table. If we do not use an
independent `MetaService Node`, we may need to introduce some other "atomic
dependency" such as ZooKeeper. But removing `MetaService Node` can make the
system more flexible, I think it's also valuable. Maybe we can carefully
design MetaService API and support different deployment modes in the next
FLIP? WDYT?


Best,
Shammon


On Fri, Feb 3, 2023 at 10:43 PM Piotr Nowojski  wrote:

> Hi Shammon,
>
> Thanks for pushing the topic further. I'm not sure how this new proposal is
> supposed to be working? How should timestamp barrier interplay with event
> time and watermarks? Or is timestamp barrier supposed to completely replace
> watermarks?
>
> > stateful and temporal operators should align them (records) according to
> their timestamp field.
>
> Are you proposing that all of the inputs to stateful operators would have
> to be sorted?
>
> > There're three states in a table for specific transaction : PreCommit,
> Commit and Snapshot
>
> Can you explain why do you need those 3 states? Why can committed records
> be rolled back?
>
> >> 10. Have you considered proposing a general consistency mechanism
> instead
> >> of restricting it to TableStore+ETL graphs? For example, it seems to me
> to
> >> be possible and valuable to define instead the contract that
> sources/sinks
> >> need to implement in order to participate in globally consistent
> snapshots.
> >
> > A general consistency mechanism is cool! In my mind, the overall
> > `consistency system` consists of three components: Streaming & Batch ETL,
> > Streaming & Batch Storage and MetaService. MetaService is decoupled from
> > Storage Layer, but it stores consistency information in persistent
> storage.
> > It can be started as an independent node or a component in a large Flink
> > cluster. In the FLIP we use TableStore as the Storage Layer. As you
> > mentioned, we plan to implement specific source and sink on the
> TableStore
> > in the first phase, and may consider other storage in the future
>
> I'm not sure if I follow. Generally speaking, why do we need MetaService at
> all? Why can 

[jira] [Created] (FLINK-30919) Fix typo in the document of User-defined Sources & Sinks

2023-02-06 Thread luoyuxia (Jira)
luoyuxia created FLINK-30919:


 Summary: Fix typo in the document of User-defined Sources & Sinks
 Key: FLINK-30919
 URL: https://issues.apache.org/jira/browse/FLINK-30919
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: luoyuxia


Just find some typo in the aibility part of the user-defined Sources & Sinks 
document 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30918) Refactor CompositePkAndMultiPartitionedTableITCase to get rid of managed table

2023-02-06 Thread yuzelin (Jira)
yuzelin created FLINK-30918:
---

 Summary: Refactor CompositePkAndMultiPartitionedTableITCase to get 
rid of managed table
 Key: FLINK-30918
 URL: https://issues.apache.org/jira/browse/FLINK-30918
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30917) The user configured max parallelism does not take effect when using adaptive batch scheduler

2023-02-06 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30917:
--

 Summary: The user configured max parallelism does not take effect 
when using adaptive batch scheduler
 Key: FLINK-30917
 URL: https://issues.apache.org/jira/browse/FLINK-30917
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, the adaptive batch scheduler only respects the global maximum 
parallelism(which is configured by option {{parallelism.default}} or 
{{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see FLINK-30686 
for details) when deciding parallelism for job vertices, the maximum 
parallelism of vertices configured by the user through {{setMaxParallelism}} 
will not be respected.

In this ticket, we will change the behavior so that the user-configured max 
parallelism also be respected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30916) RocksDBStateUploaderTest.testUploadedSstCanBeCleanedUp failed with assertion

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30916:
-

 Summary: RocksDBStateUploaderTest.testUploadedSstCanBeCleanedUp 
failed with assertion
 Key: FLINK-30916
 URL: https://issues.apache.org/jira/browse/FLINK-30916
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.17.0
Reporter: Matthias Pohl


{{RocksDBStateUploaderTest.testUploadedSstCanBeCleanedUp}} failed due to an 
assertion:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45730=logs=f0ac5c25-1168-55a5-07ff-0e88223afed9=50bf7a25-bdc4-5e56-5478-c7b4511dde53=10328

{code}
Feb 06 02:54:01 [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 0.427 s <<< FAILURE! - in 
org.apache.flink.contrib.streaming.state.RocksDBStateUploaderTest
Feb 06 02:54:01 [ERROR] 
org.apache.flink.contrib.streaming.state.RocksDBStateUploaderTest.testUploadedSstCanBeCleanedUp
  Time elapsed: 0.115 s  <<< FAILURE!
Feb 06 02:54:01 java.lang.AssertionError: 
Feb 06 02:54:01 
Feb 06 02:54:01 Expecting empty but was: 
["379065d4-dd29-4455-b30f-4dbc53336ea2"]
Feb 06 02:54:01 at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploaderTest.testUploadedSstCanBeCleanedUp(RocksDBStateUploaderTest.java:166)
Feb 06 02:54:01 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30915) `flink-fs-hadoop-s3` connector is unable to find IRSA credentials

2023-02-06 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-30915:
--

 Summary: `flink-fs-hadoop-s3` connector is unable to find IRSA 
credentials
 Key: FLINK-30915
 URL: https://issues.apache.org/jira/browse/FLINK-30915
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.16.0
Reporter: Samrat Deb


 

"flink-fs-s3-hadoop" connectors were not able to find the credentials just fine 
when configured with the {{WebIdentityTokenCredentialsProvider. }}

{{}}

when I try to use Flinks s3 connector I get access denied, so then I made sure 
to set the correct identity provider in my flink-conf, which was set to the 
following

{{}}

 

{{hadoop.fs.s3a.aws.credentials.provider: 
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider"}}

{{}}{{}}
{code:java}
2023-02-02 21:02:06,214 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut 
down.
2023-02-02 21:02:06,293 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not 
start cluster entrypoint KubernetesApplicationClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:255)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
 [flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
 [flink-dist-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
    at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
 ~[flink-dist-1.16.0.jar:1.16.0]
    at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
 ~[flink-dist-1.16.0.jar:1.16.0]
    ... 2 more
Caused by: java.nio.file.AccessDeniedException: 
s3:///flink-ha/basic-example/blob: getFileStatus on 
s3:///flink-ha/basic-example/blob: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: 
Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
CXJ8Y79Z8SYTBEFM; S3 Extended Request ID: 1234567/1234567; Proxy: null), S3 
Extended Request ID:123454321/123232:AccessDenied
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255) 
~[?:?]
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) 
~[?:?]
    at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3858) 
~[?:?]
    at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
 ~[?:?]
    at 
org.apache.hadoop.fs.s3a.S3AFileSystem$MkdirOperationCallbacksImpl.probePathStatus(S3AFileSystem.java:3455)
 ~[?:?]
    at 
org.apache.hadoop.fs.s3a.impl.MkdirOperation.probePathStatusOrNull(MkdirOperation.java:135)
 ~[?:?]
    at 
org.apache.hadoop.fs.s3a.impl.MkdirOperation.getPathStatusExpectingDir(MkdirOperation.java:150)
 ~[?:?]
    at 
org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:80) 
~[?:?]
    at 
org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:45) 
~[?:?]
    at 
org.apache.hadoop.fs.s3a.impl.ExecutingStoreOperation.apply(ExecutingStoreOperation.java:76)
 ~[?:?]
    at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[?:?]
    at 

[jira] [Created] (FLINK-30914) ZooKeeperLeaderElectionTest.testUnExpectedErrorForwarding failed

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30914:
-

 Summary: ZooKeeperLeaderElectionTest.testUnExpectedErrorForwarding 
failed
 Key: FLINK-30914
 URL: https://issues.apache.org/jira/browse/FLINK-30914
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.3
Reporter: Matthias Pohl


{{ZooKeeperLeaderElectionTest.testUnExpectedErrorForwarding}} failed:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45731=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a=9555

{code}
Feb 06 02:46:51 [ERROR]   
ZooKeeperLeaderElectionTest.testUnExpectedErrorForwarding:724 
Feb 06 02:46:51 Expected: Expected failure cause is 

Feb 06 02:46:51  but: The throwable 
 does not contain the expected failure cause 

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30913) Various PyFlink tests fail

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30913:
-

 Summary: Various PyFlink tests fail
 Key: FLINK-30913
 URL: https://issues.apache.org/jira/browse/FLINK-30913
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.3
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45731=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=33293
{code}
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_chaining_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_create_and_drop_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_data_types
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_deterministic
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_udf_in_join_condition
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_udf_in_join_condition_2
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_udf_with_constant_params
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_udf_with_rowtime_arguments
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkStreamUserDefinedFunctionTests::test_udf_without_arguments
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_all_data_types
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_all_data_types_expression
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_chaining_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_create_and_drop_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_open
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_overwrite_builtin_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_udf_in_join_condition
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_udf_in_join_condition_2
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_udf_with_constant_params
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkBatchUserDefinedFunctionTests::test_udf_without_arguments
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_all_data_types
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_all_data_types_expression
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_chaining_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_create_and_drop_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_open
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_overwrite_builtin_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_scalar_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_udf_in_join_condition
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_udf_in_join_condition_2
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_udf_with_constant_params
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udf.py::PyFlinkEmbeddedThreadTests::test_udf_without_arguments
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udtf.py::PyFlinkStreamUserDefinedFunctionTests::test_table_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udtf.py::PyFlinkStreamUserDefinedFunctionTests::test_table_function_with_sql_query
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udtf.py::PyFlinkBatchUserDefinedFunctionTests::test_table_function
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_udtf.py::PyFlinkBatchUserDefinedFunctionTests::test_table_function_with_sql_query
Feb 06 04:45:59 FAILED 
pyflink/table/tests/test_window.py::StreamTableWindowTests::test_over_window
Feb 06 04:45:59 FAILED 

[jira] [Created] (FLINK-30912) CreateTableAsITCase seem to have timed out on Azure

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30912:
-

 Summary: CreateTableAsITCase seem to have timed out on Azure
 Key: FLINK-30912
 URL: https://issues.apache.org/jira/browse/FLINK-30912
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Affects Versions: 1.16.1
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45722=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=1001

There is no additional log file because Azure failed to upload logs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30911) KafkaSinkE2ECase.testStartFromSavepoint fails with TimeoutException because the topic doesn't become available in the meta file

2023-02-06 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30911:
-

 Summary: KafkaSinkE2ECase.testStartFromSavepoint fails with 
TimeoutException because the topic doesn't become available in the meta file
 Key: FLINK-30911
 URL: https://issues.apache.org/jira/browse/FLINK-30911
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: Matthias Pohl


{{KafkaSinkE2ECasetestStartFromSavepoint}} with timeout exception after the 
topic didn't end up in the meta file after 60s.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45722=logs=f8e16326-dc75-5ba0-3e95-6178dd55bf6c=15c1d318-5ca8-529f-77a2-d113a700ec34=15563

{code}
Feb 05 05:28:58 Caused by: org.apache.flink.util.FlinkRuntimeException: Failed 
to send data to Kafka kafka-single-topic-9151082293470264886--1@-1 with 
FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, 
closed=false} 
Feb 05 05:28:58 at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
Feb 05 05:28:58 at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
Feb 05 05:28:58 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
Feb 05 05:28:58 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
Feb 05 05:28:58 at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
Feb 05 05:28:58 at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
Feb 05 05:28:58 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
Feb 05 05:28:58 at java.lang.Thread.run(Thread.java:750)
Feb 05 05:28:58 Caused by: org.apache.kafka.common.errors.TimeoutException: 
Topic kafka-single-topic-9151082293470264886 not present in metadata after 
6 ms.
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)