[jira] [Created] (FLINK-35042) Streaming File Sink s3 end-to-end test failed as TM lost

2024-04-07 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35042:
--

 Summary: Streaming File Sink s3 end-to-end test failed as TM lost
 Key: FLINK-35042
 URL: https://issues.apache.org/jira/browse/FLINK-35042
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-07 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35041:
--

 Summary: 
IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
 Key: FLINK-35041
 URL: https://issues.apache.org/jira/browse/FLINK-35041
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Two potential bugs in Flink ML

2024-04-07 Thread Komal M
Hi Yungfeng,


Thank you so much for getting back!

For the first bug, here is a sample code that should reproduce it. All it does 
is subtract 1 from the feedback stream until the tuples reach 0.0. For each 
subtraction it outputs a relevant message in the ‘finalOutput’ stream. These 
messages are stored in the keyedState of KeyedCoProcessFunction and are 
populated by a dataset stream called initialStates. For each key there are 
different messages associated with it, hence the need for MapState.

 For the second bug, let me compare my implementation to the references you 
have provided and get back to you on that.


import java.util.*;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBodyResult;
import org.apache.flink.iteration.Iterations;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class Test {
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();

// sample datastreams (they are assumed to be unbounded streams outside of this 
test environment)
List> feedbackinitializer = Arrays.asList(
new Tuple2<>("A", 2.0),
new Tuple2<>("B", 3.0),
new Tuple2<>("C", 1.0),
new Tuple2<>("D", 1.0)
);

List> initialStates = Arrays.asList(
new Tuple3<>("A", 0.0, "Final Output A"),
new Tuple3<>("A", 1.0, "Test 1A"),
new Tuple3<>("B", 2.0, "Test 2B"),
new Tuple3<>("B", 1.0, "Test 1B"),
new Tuple3<>("B", 0.0, "Final Output B"),
new Tuple3<>("C", 0.0, "No Change C"),
new Tuple3<>("D", 0.0, "Final Output D")
);


DataStream> feedbackStream = 
env.fromCollection(feedbackinitializer);
DataStream> initialStateStream = 
env.fromCollection(initialStates);

//parallelize
DataStream> feedbackParallel = 
feedbackStream.keyBy(x -> x.f0)
.map(i -> Tuple2.of(i.f0,i.f1))
.returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
DataStream> initialStateParallel = 
initialStateStream.keyBy(x -> x.f0)
.map(i -> Tuple3.of(i.f0,i.f1,i.f2))
.returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.STRING));



//iterate
DataStreamList result = Iterations.iterateUnboundedStreams(
DataStreamList.of(feedbackParallel),
DataStreamList.of(initialStateParallel),
(variableStreams, dataStreams) -> {
DataStream> modelUpdate = 
variableStreams.get(0);
DataStream> stateStream = 
dataStreams.get(0);


OutputTag finalOutputTag = new 
OutputTag("msgs") {
};

SingleOutputStreamOperator> 
newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new 
KeyedCoProcessFunction, Tuple2, Tuple2>() {
private transient MapState state;

@Override
public void processElement1(Tuple3 stateUpdate, Context context, Collector> 
collector) throws Exception {
state.put(stateUpdate.f1, stateUpdate.f2); //load 
stateStream into mapState
}

@Override
public void processElement2(Tuple2 
modelUpdate, Context context, Collector> collector) 
throws Exception {
double weight = modelUpdate.f1;
weight = weight - 1;
//subtract 1 until 0.0
if (weight > -1.0) {
collector.collect(Tuple2.of(modelUpdate.f0, 
weight));
context.output(finalOutputTag, 
state.get(weight));
}
}

@Override
public void open(Configuration config) {
  

[jira] [Created] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-07 Thread Rui Fan (Jira)
Rui Fan created FLINK-35040:
---

 Summary: The performance of serializerHeavyString regresses since 
April 3
 Key: FLINK-35040
 URL: https://issues.apache.org/jira/browse/FLINK-35040
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.20.0
Reporter: Rui Fan
 Attachments: image-2024-04-08-10-51-07-403.png

The performance of serializerHeavyString regresses since April 3, and had not 
yet recovered on April 8th.

http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200


 !image-2024-04-08-10-51-07-403.png! 





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35039) Create Profiling JobManager/TaskManager Instance failed

2024-04-07 Thread JJJJude (Jira)
ude created FLINK-35039:
---

 Summary: Create Profiling JobManager/TaskManager Instance failed
 Key: FLINK-35039
 URL: https://issues.apache.org/jira/browse/FLINK-35039
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
 Environment: Hadoop 3.2.2
Flink 1.19
Reporter: ude
 Attachments: image-2024-04-08-10-21-31-066.png, 
image-2024-04-08-10-21-48-417.png, image-2024-04-08-10-30-16-683.png

I'm test the "async-profiler" feature in version 1.19, but when I submit a task 
in yarn per-job mode, I get an error  when I click Create Profiling Instance on 
the flink Web UI page.
!image-2024-04-08-10-21-31-066.png!

!image-2024-04-08-10-21-48-417.png!

The error message obviously means that the yarn proxy server does not support 
*POST* calls. I checked the code of _*WebAppProxyServlet.java*_ and found that 
the *POST* method is indeed not supported, so I changed it to *PUT* method and 
the call was successful.

!image-2024-04-08-10-30-16-683.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2

2024-04-07 Thread Ufuk Celebi (Jira)
Ufuk Celebi created FLINK-35038:
---

 Summary: Bump test dependency org.yaml:snakeyaml to 2.2 
 Key: FLINK-35038
 URL: https://issues.apache.org/jira/browse/FLINK-35038
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Affects Versions: 3.1.0
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 3.1.0


Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test scope 
dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193.

This outdated version of SnakeYAML triggers security warnings. These should not 
be an actual issue given the test scope, but we should consider bumping the 
version for security hygiene purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-07 Thread yisha zhou (Jira)
yisha zhou created FLINK-35037:
--

 Summary: Optimize uniqueKeys and upsertKeys inference of windows 
with ROW_NUMBER
 Key: FLINK-35037
 URL: https://issues.apache.org/jira/browse/FLINK-35037
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: yisha zhou


In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs if these keys contains the partition keys.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-07 Thread Ron liu
Hi, Dev

This is a summary letter. After several rounds of discussion, there is a
strong consensus about the FLIP proposal and the issues it aims to address.
The current point of disagreement is the naming of the new concept. I have
summarized the candidates as follows:

1. Derived Table (Inspired by Google Lookers)
- Pros: Google Lookers has introduced this concept, which is designed
for building Looker's automated modeling, aligning with our purpose for the
stream-batch automatic pipeline.

- Cons: The SQL standard uses derived table term extensively, vendors
adopt this for simply referring to a table within a subclause.

2. Materialized Table: It means materialize the query result to table,
similar to Db2 MQT (Materialized Query Tables). In addition, Snowflake
Dynamic Table's predecessor is also called Materialized Table.

3. Updating Table (From Timo)

4. Updating Materialized View (From Timo)

5. Refresh/Live Table (From Martijn)

As Martijn said, naming is a headache, looking forward to more valuable
input from everyone.

[1]
https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables
[2] https://www.ibm.com/docs/en/db2/11.5?topic=tables-materialized-query
[3]
https://community.denodo.com/docs/html/browse/6.0/vdp/vql/materialized_tables/creating_materialized_tables/creating_materialized_tables

Best,
Ron

Ron liu  于2024年4月7日周日 15:55写道:

> Hi, Lorenzo
>
> Thank you for your insightful input.
>
> >>> I think the 2 above twisted the materialized view concept to more than
> just an optimization for accessing pre-computed aggregates/filters.
> I think that concept (at least in my mind) is now adherent to the
> semantics of the words themselves ("materialized" and "view") than on its
> implementations in DBMs, as just a view on raw data that, hopefully, is
> constantly updated with fresh results.
> That's why I understand Timo's et al. objections.
>
> Your understanding of Materialized Views is correct. However, in our
> scenario, an important feature is the support for Update & Delete
> operations, which the current Materialized Views cannot fulfill. As we
> discussed with Timo before, if Materialized Views needs to support data
> modifications, it would require an extension of new keywords, such as
> CREATING xxx (UPDATING) MATERIALIZED VIEW.
>
> >>> Still, I don't understand why we need another type of special table.
> Could you dive deep into the reasons why not simply adding the FRESHNESS
> parameter to standard tables?
>
> Firstly, I need to emphasize that we cannot achieve the design goal of
> FLIP through the CREATE TABLE syntax combined with a FRESHNESS parameter.
> The proposal of this FLIP is to use Dynamic Table + Continuous Query, and
> combine it with FRESHNESS to realize a streaming-batch unification.
> However, CREATE TABLE is merely a metadata operation and cannot
> automatically start a background refresh job. To achieve the design goal of
> FLIP with standard tables, it would require extending the CTAS[1] syntax to
> introduce the FRESHNESS keyword. We considered this design initially, but
> it has following problems:
>
> 1. Distinguishing a table created through CTAS as a standard table or as a
> "special" standard table with an ongoing background refresh job using the
> FRESHNESS keyword is very obscure for users.
> 2. It intrudes on the semantics of the CTAS syntax. Currently, tables
> created using CTAS only add table metadata to the Catalog and do not record
> attributes such as query. There are also no ongoing background refresh
> jobs, and the data writing operation happens only once at table creation.
> 3. For the framework, when we perform a certain kind of Alter Table
> behavior for a table, for the table created by specifying FRESHNESS and did
> not specify the FRESHNESS created table behavior how to distinguish , which
> will also cause confusion.
>
> In terms of the design goal of combining Dynamic Table + Continuous Query,
> the FLIP proposal cannot be realized by only extending the current stardand
> tables, so a new kind of dynamic table needs to be introduced at the
> first-level concept.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#as-select_statement
>
> Best,
> Ron
>
>  于2024年4月3日周三 22:25写道:
>
>> Hello everybody!
>> Thanks for the FLIP as it looks amazing (and I think the prove is this
>> deep discussion it is provoking :))
>>
>> I have a couple of comments to add to this:
>>
>> Even though I get the reason why you rejected MATERIALIZED VIEW, I still
>> like it a lot, and I would like to provide pointers on how the materialized
>> view concept twisted in last years:
>>
>> • Materialize DB (https://materialize.com/)
>> • The famous talk by Martin Kleppmann "turning the database inside out" (
>> https://www.youtube.com/watch?v=fU9hR3kiOK0)
>>
>> I think the 2 above twisted the materialized view concept to more than
>> just an optimization for accessing pre-computed aggregates/filters.
>> I think 

[jira] [Created] (FLINK-35036) Flink CDC Job cancel with savepoint failed

2024-04-07 Thread Fly365 (Jira)
Fly365 created FLINK-35036:
--

 Summary: Flink CDC Job cancel with savepoint failed
 Key: FLINK-35036
 URL: https://issues.apache.org/jira/browse/FLINK-35036
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
 Environment: Flink 1.15.2

Flink CDC 2.4.2

Oracle 19C

Doris 2.0.3
Reporter: Fly365
 Attachments: image-2024-04-07-17-35-23-136.png

With the Flink CDC job, I want oracle data to doris, in the  snapshot,canel the 
Flink CDC Job with savepoint,the job cancel failed.

使用Flink CDC,将Oracle 
19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink 
Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败?

!image-2024-04-07-17-35-23-136.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-07 Thread Ron liu
Hi, Lorenzo

Thank you for your insightful input.

>>> I think the 2 above twisted the materialized view concept to more than
just an optimization for accessing pre-computed aggregates/filters.
I think that concept (at least in my mind) is now adherent to the semantics
of the words themselves ("materialized" and "view") than on its
implementations in DBMs, as just a view on raw data that, hopefully, is
constantly updated with fresh results.
That's why I understand Timo's et al. objections.

Your understanding of Materialized Views is correct. However, in our
scenario, an important feature is the support for Update & Delete
operations, which the current Materialized Views cannot fulfill. As we
discussed with Timo before, if Materialized Views needs to support data
modifications, it would require an extension of new keywords, such as
CREATING xxx (UPDATING) MATERIALIZED VIEW.

>>> Still, I don't understand why we need another type of special table.
Could you dive deep into the reasons why not simply adding the FRESHNESS
parameter to standard tables?

Firstly, I need to emphasize that we cannot achieve the design goal of FLIP
through the CREATE TABLE syntax combined with a FRESHNESS parameter. The
proposal of this FLIP is to use Dynamic Table + Continuous Query, and
combine it with FRESHNESS to realize a streaming-batch unification.
However, CREATE TABLE is merely a metadata operation and cannot
automatically start a background refresh job. To achieve the design goal of
FLIP with standard tables, it would require extending the CTAS[1] syntax to
introduce the FRESHNESS keyword. We considered this design initially, but
it has following problems:

1. Distinguishing a table created through CTAS as a standard table or as a
"special" standard table with an ongoing background refresh job using the
FRESHNESS keyword is very obscure for users.
2. It intrudes on the semantics of the CTAS syntax. Currently, tables
created using CTAS only add table metadata to the Catalog and do not record
attributes such as query. There are also no ongoing background refresh
jobs, and the data writing operation happens only once at table creation.
3. For the framework, when we perform a certain kind of Alter Table
behavior for a table, for the table created by specifying FRESHNESS and did
not specify the FRESHNESS created table behavior how to distinguish , which
will also cause confusion.

In terms of the design goal of combining Dynamic Table + Continuous Query,
the FLIP proposal cannot be realized by only extending the current stardand
tables, so a new kind of dynamic table needs to be introduced at the
first-level concept.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#as-select_statement

Best,
Ron

 于2024年4月3日周三 22:25写道:

> Hello everybody!
> Thanks for the FLIP as it looks amazing (and I think the prove is this
> deep discussion it is provoking :))
>
> I have a couple of comments to add to this:
>
> Even though I get the reason why you rejected MATERIALIZED VIEW, I still
> like it a lot, and I would like to provide pointers on how the materialized
> view concept twisted in last years:
>
> • Materialize DB (https://materialize.com/)
> • The famous talk by Martin Kleppmann "turning the database inside out" (
> https://www.youtube.com/watch?v=fU9hR3kiOK0)
>
> I think the 2 above twisted the materialized view concept to more than
> just an optimization for accessing pre-computed aggregates/filters.
> I think that concept (at least in my mind) is now adherent to the
> semantics of the words themselves ("materialized" and "view") than on its
> implementations in DBMs, as just a view on raw data that, hopefully, is
> constantly updated with fresh results.
> That's why I understand Timo's et al. objections.
> Still I understand there is no need to add confusion :)
>
> Still, I don't understand why we need another type of special table.
> Could you dive deep into the reasons why not simply adding the FRESHNESS
> parameter to standard tables?
>
> I would say that as a very seamless implementation with the goal of a
> unification of batch and streaming.
> If we stick to a unified world, I think that Flink should just provide 1
> type of table that is inherently dynamic.
> Now, depending on FRESHNESS objectives / connectors used in WITH, that
> table can be backed by a stream or batch job as you explained in your FLIP.
>
> Maybe I am totally missing the point :)
>
> Thank you in advance,
> Lorenzo
> On Apr 3, 2024 at 15:25 +0200, Martijn Visser ,
> wrote:
> > Hi all,
> >
> > Thanks for the proposal. While the FLIP talks extensively on how
> Snowflake
> > has Dynamic Tables and Databricks has Delta Live Tables, my understanding
> > is that Databricks has CREATE STREAMING TABLE [1] which relates with this
> > proposal.
> >
> > I do have concerns about using CREATE DYNAMIC TABLE, specifically about
> > confusing the users who are familiar with Snowflake's approach where you
> > can't change the content via 

[jira] [Created] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35035:
--

 Summary: Reduce job pause time when cluster resources are expanded 
in adaptive mode
 Key: FLINK-35035
 URL: https://issues.apache.org/jira/browse/FLINK-35035
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.19.0
Reporter: yuanfenghu


When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources, and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation
 。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35034) codegen compile error raised when use kafka connector and protobuf format

2024-04-07 Thread yufeng.sun (Jira)
yufeng.sun created FLINK-35034:
--

 Summary: codegen compile error raised when use kafka connector and 
protobuf format
 Key: FLINK-35034
 URL: https://issues.apache.org/jira/browse/FLINK-35034
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Affects Versions: 1.17.2
Reporter: yufeng.sun


The following error messages and stack were encountered When i using Flink SQL 
with Kafka connector and protobuf format:
{code:java}
2024-03-23 23:23:38,852 ERROR 
org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf 
codegen compile error: package 
org.apache.flink.formats.protobuf.deserialize;import 
org.apache.flink.table.data.RowData;import 
org.apache.flink.table.data.ArrayData;import 
org.apache.flink.table.data.binary.BinaryStringData;import 
org.apache.flink.table.data.GenericRowData;import 
org.apache.flink.table.data.GenericMapData;import 
org.apache.flink.table.data.GenericArrayData;import java.util.ArrayList;import 
java.util.List;import java.util.Map;import java.util.HashMap;import 
com.google.protobuf.ByteString;public class 
GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData 
decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User 
message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object 
elementDataVar1 = null;elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);Object elementDataVar2 = 
null;elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);Object elementDataVar3 = 
null;elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);Object elementDataVar4 = 
null;elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);Object elementDataVar5 = 
null;elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);Object elementDataVar6 = 
null;elementDataVar6 = 
BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);Object elementDataVar7 = 
null;elementDataVar7 = 
BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);rowData = rowData0;
return rowData;}}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 
(c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) 
switched from INITIALIZING to FAILED with failure 
cause:org.apache.flink.formats.protobuf.PbCodegenException: 
org.apache.flink.api.common.InvalidProgramException: Program cannot be 
compiled. This is a bug. Please file an issue. at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.(ProtoToRowConverter.java:124)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]  at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]   at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
   at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
 at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) 

[jira] [Created] (FLINK-35031) Event timer firing under async execution model

2024-04-07 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35031:
--

 Summary: Event timer firing under async execution model
 Key: FLINK-35031
 URL: https://issues.apache.org/jira/browse/FLINK-35031
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35030) Introduce Epoch Manager for watermark under async execution

2024-04-07 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35030:
--

 Summary: Introduce Epoch Manager for watermark under async 
execution
 Key: FLINK-35030
 URL: https://issues.apache.org/jira/browse/FLINK-35030
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35029) Store timer in JVM heap when async execution enabled

2024-04-07 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35029:
--

 Summary: Store timer in JVM heap when async execution enabled
 Key: FLINK-35029
 URL: https://issues.apache.org/jira/browse/FLINK-35029
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35028) Processing timer firing under async execution model

2024-04-07 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35028:
--

 Summary: Processing timer firing under async execution model
 Key: FLINK-35028
 URL: https://issues.apache.org/jira/browse/FLINK-35028
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-07 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35027:
--

 Summary: Implement checkpoint drain in AsyncExecutionController
 Key: FLINK-35027
 URL: https://issues.apache.org/jira/browse/FLINK-35027
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)