Re: [DISCUSS] FLIP-398: Improve Serialization Configuration And Usage In Flink

2023-12-24 Thread Yong Fang
Hi devs,

Thanks for all the feedback. If there are no more comments, I would like to
start a vote for this FLIP, thanks again!

Best,
Fang Yong

On Wed, Dec 20, 2023 at 9:12 PM Yong Fang  wrote:

> Hi Ken,
>
> Thanks for your feedback. The purpose of this FLIP is to improve the use
> of serialization, including configurable serializer for users, providing
> serializer for composite data types, and resolving the default enabling of
> Kryo, etc. Introducing a better serialization framework would be a great
> help for Flink's performance, and it's great to see your tests on Fury.
> However, as @Xintong mentioned, this could be a huge work and beyond the
> scope of this FLIP. If you're interested, I think we could create a new
> FLIP for it and discuss it further. What do you think? Thanks.
>
> Best,
> Fang Yong
>
> On Mon, Dec 18, 2023 at 11:16 AM Xintong Song 
> wrote:
>
>> Hi Ken,
>>
>> I think the main purpose of this FLIP is to change how users interact with
>> the knobs for customizing the serialization behaviors, from requiring code
>> changes to working with pure configurations. Redesigning the knobs (i.e.,
>> names, semantics, etc.), on the other hand, is not the purpose of this
>> FLIP. Preserving the existing names and semantics should also help
>> minimize
>> the migration cost for existing users. Therefore, I'm in favor of not
>> changing them.
>>
>> Concerning decoupling from Kryo, and introducing other serialization
>> frameworks like Fury, I think that's a bigger topic that is worth further
>> discussion. At the moment, I'm not aware of any community consensus on
>> doing so. And even if in the future we decide to do so, the changes needed
>> should be the same w/ or w/o this FLIP. So I'd suggest not to block this
>> FLIP on these issues.
>>
>> WDYT?
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Fri, Dec 15, 2023 at 1:40 AM Ken Krugler 
>> wrote:
>>
>> > Hi Yong,
>> >
>> > Looks good, thanks for creating this.
>> >
>> > One comment - related to my recent email about Fury, I would love to see
>> > the v2 serialization decoupled from Kryo.
>> >
>> > As part of that, instead of using xxxKryo in methods, call them
>> xxxGeneric.
>> >
>> > A more extreme change would be to totally rely on Fury (so no more POJO
>> > serializer). Fury is faster than the POJO serializer in my tests, but
>> this
>> > would be a much bigger change.
>> >
>> > Though it could dramatically simplify the Flink serialization support.
>> >
>> > — Ken
>> >
>> > PS - a separate issue is how to migrate state from Kryo to something
>> like
>> > Fury, which supports schema evolution. I think this might be possible,
>> by
>> > having a smarter deserializer that identifies state as being created by
>> > Kryo, and using (shaded) Kryo to deserialize, while still writing as
>> Fury.
>> >
>> > > On Dec 6, 2023, at 6:35 PM, Yong Fang  wrote:
>> > >
>> > > Hi devs,
>> > >
>> > > I'd like to start a discussion about FLIP-398: Improve Serialization
>> > > Configuration And Usage In Flink [1].
>> > >
>> > > Currently, users can register custom data types and serializers in
>> Flink
>> > > jobs through various methods, including registration in code,
>> > > configuration, and annotations. These lead to difficulties in
>> upgrading
>> > > Flink jobs and priority issues.
>> > >
>> > > In flink-2.0 we would like to manage job data types and serializers
>> > through
>> > > configurations. This FLIP will introduce a unified option for data
>> type
>> > and
>> > > serializer and users can configure all custom data types and
>> > > pojo/kryo/custom serializers. In addition, this FLIP will add more
>> > built-in
>> > > serializers for complex data types such as List and Map, and optimize
>> the
>> > > management of Avro Serializers.
>> > >
>> > > Looking forward to hearing from you, thanks!
>> > >
>> > > [1]
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink
>> > >
>> > > Best,
>> > > Fang Yong
>> >
>> > --
>> > Ken Krugler
>> > http://www.scaleunlimited.com
>> > Custom big data solutions
>> > Flink & Pinot
>> >
>> >
>> >
>> >
>>
>


[jira] [Created] (FLINK-33935) Improve the default value doc and logic for some state backend and checkpoint related options

2023-12-24 Thread Rui Fan (Jira)
Rui Fan created FLINK-33935:
---

 Summary: Improve the default value doc and logic for some state 
backend and checkpoint related options
 Key: FLINK-33935
 URL: https://issues.apache.org/jira/browse/FLINK-33935
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.19.0


Some state backend and checkpoint related options don't set the default value 
directly, but but they implement default value based on code. Such as:
 * execution.checkpointing.tolerable-failed-checkpoints
 ** 
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints]
 * state.backend.type
 ** 
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-backend-type]

h2. Option1

execution.checkpointing.tolerable-failed-checkpoints doesn't have default 
value, but CheckpointConfig#getTolerableCheckpointFailureNumber calls  
{color:#9876aa}configuration{color}.getOptional(ExecutionCheckpointingOptions.{color:#9876aa}TOLERABLE_FAILURE_NUMBER{color}).orElse({color:#6897bb}0{color}).

It means the 0 is default value of 
execution.checkpointing.tolerable-failed-checkpoints.
h2. Option2

state.backend.type does't have default value, but 
StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal calls 

loadStateBackendFromConfig(config{color:#cc7832}, 
{color}classLoader{color:#cc7832}, {color}logger). When the return value is 
null, Flink will consider the hashmap as the default state backend.

I checked all callers of StateBackendLoader#loadStateBackendFromConfig, if we 
change the default value of state.backend.type to hashmap. All of them work 
well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2023-12-24 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-33934:
---

 Summary: Flink SQL Source use raw format maybe lead to data lost
 Key: FLINK-33934
 URL: https://issues.apache.org/jira/browse/FLINK-33934
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
Reporter: Cai Liuyang


In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI

2023-12-24 Thread Xuyang
Hi, Jane, thanks for driving this. 


IMO, it is important to keep same consistent semantics between table api and 
sql, not only for maintenance, but also for user experience. But for users, the 
impact of this modification is a bit large. Is it possible to consider 
introducing a deprecated option to allow users to fall back to the previous 
version (default fallback), and then officially deprecate it in Flink 2.0?


BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle the 
nullable attribute of the Row type in the way Calcite expected. However, fixing 
them will also cause a relatively large impact. We may also need to check the 
code part in SQL.


[1] https://issues.apache.org/jira/browse/FLINK-33217




--

Best!
Xuyang





在 2023-12-25 10:16:28,"Shengkai Fang"  写道:
>Thanks for Jane and Sergey's proposal!
>
>+1 to correct the Table API behavior.
>
>I have one question: Is the influence only limited to the RowType? Does the
>Map or Array type have the same problems?
>
>Best,
>Shengkai
>[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL
>and TableA
>
>Jane Chan  于2023年12月22日周五 17:40写道:
>
>> Dear devs,
>>
>> Several issues [1][2][3] have been identified regarding the inconsistent
>> treatment of ROW type nullability between SQL and TableAPI. However,
>> addressing these discrepancies might necessitate updates to the public API.
>> Therefore, I'm initiating this discussion to engage the community in
>> forging a unified approach to resolve these challenges.
>>
>> To summarize, SQL prohibits ROW types such as ROW> STRING>, which is implicitly rewritten to ROW by
>> Calcite[4]. In contrast, TableAPI permits such types, resulting in
>> inconsistency.
>> [image: image.png]
>> For a comprehensive issue breakdown, please refer to the comment of [1].
>>
>> According to CALCITE-2464[4], ROW is not a valid type. As
>> a result, the behavior of TableAPI is incorrect and needs to be consistent
>> with SQL, which means the implantation for the following public API needs
>> to be changed.
>>
>>- RowType#copy(boolean nullable) should also set the inner fields to
>>null if nullable is true.
>>- RowType's constructor should also check nullability.
>>- FieldsDataType#nullable() should also set the inner fields to null.
>>
>> In addition to the necessary changes in the implementation of the
>> aforementioned API, the semantics of the following API have also been
>> impacted.
>>
>>- `DataTypes.ROW(DataTypes.FIELD("f0",
>>DataTypes.INT().notNull())).notNull()` cannot create a type like `ROW>NOT NULL>NOT NULL`.
>>- Idempotence for chained calls `notNull().nullable().notNull()` for
>>`Row` cannot be maintained.
>>
>> Sergey and I have engaged in a discussion regarding the solution [1]. I'm
>> interested in gathering additional perspectives on the fix.
>>
>> Look forward to your ideas!
>>
>> Best,
>> Jane
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-31830
>> [2] https://issues.apache.org/jira/browse/FLINK-31829
>> [3] https://issues.apache.org/jira/browse/FLINK-33217
>> [4] https://issues.apache.org/jira/browse/CALCITE-2464
>>


[jira] [Created] (FLINK-33933) SerializedThrowable will be java.lang.StackOverflowError when AsyncLookupFunction throw an exception

2023-12-24 Thread KarlManong (Jira)
KarlManong created FLINK-33933:
--

 Summary: SerializedThrowable will be java.lang.StackOverflowError 
when AsyncLookupFunction throw an exception
 Key: FLINK-33933
 URL: https://issues.apache.org/jira/browse/FLINK-33933
 Project: Flink
  Issue Type: Bug
 Environment: tested from 1.16 to 1.18 , the same behavior 
Reporter: KarlManong


Here is a simple example
{code:java}
// example

public class TableA implements LookupTableSource {


@Nullable
private final LookupCache cache;

public TableA(@Nullable LookupCache cache) {
this.cache = cache;
}

@Override
public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context) {

FunctionA lookupFunction = new FunctionA();

if (cache != null) {
return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
} else {
return AsyncLookupFunctionProvider.of(lookupFunction);
}
}

@Override
public DynamicTableSource copy() {
return new TableA(cache);
}

@Override
public String asSummaryString() {
return "Async Table";
}
}

public class LookupFunctionA extends AsyncLookupFunction {


@Override
public CompletableFuture> asyncLookup(RowData keyRow) {
CompletableFuture> future = new
CompletableFuture<>();
future.completeExceptionally(new IOException("request failed"));
return future;
}

}
{code}
When using TableA, StackOverflowError occurs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33932) Support retry mechanism for rocksdb uploader

2023-12-24 Thread Guojun Li (Jira)
Guojun Li created FLINK-33932:
-

 Summary: Support retry mechanism for rocksdb uploader
 Key: FLINK-33932
 URL: https://issues.apache.org/jira/browse/FLINK-33932
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Guojun Li


Rocksdb uploader will throw exception and decline the current checkpoint if 
there are errors when uploading to remote file system like hdfs.

The exception is as below:

2023-12-19 08:46:00,197 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
checkpoint 2 by task 
5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job 
a025f19e at 
application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
org.apache.flink.util.SerializedThrowable: 
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
checkpoint failed.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
Calc[133] (184/500)#0.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
Could not flush to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 ~[?:?]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.net.ConnectException: Connection timed out
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) 
~[?:?]
    at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 

Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI

2023-12-24 Thread Shengkai Fang
Thanks for Jane and Sergey's proposal!

+1 to correct the Table API behavior.

I have one question: Is the influence only limited to the RowType? Does the
Map or Array type have the same problems?

Best,
Shengkai
[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL
and TableA

Jane Chan  于2023年12月22日周五 17:40写道:

> Dear devs,
>
> Several issues [1][2][3] have been identified regarding the inconsistent
> treatment of ROW type nullability between SQL and TableAPI. However,
> addressing these discrepancies might necessitate updates to the public API.
> Therefore, I'm initiating this discussion to engage the community in
> forging a unified approach to resolve these challenges.
>
> To summarize, SQL prohibits ROW types such as ROW STRING>, which is implicitly rewritten to ROW by
> Calcite[4]. In contrast, TableAPI permits such types, resulting in
> inconsistency.
> [image: image.png]
> For a comprehensive issue breakdown, please refer to the comment of [1].
>
> According to CALCITE-2464[4], ROW is not a valid type. As
> a result, the behavior of TableAPI is incorrect and needs to be consistent
> with SQL, which means the implantation for the following public API needs
> to be changed.
>
>- RowType#copy(boolean nullable) should also set the inner fields to
>null if nullable is true.
>- RowType's constructor should also check nullability.
>- FieldsDataType#nullable() should also set the inner fields to null.
>
> In addition to the necessary changes in the implementation of the
> aforementioned API, the semantics of the following API have also been
> impacted.
>
>- `DataTypes.ROW(DataTypes.FIELD("f0",
>DataTypes.INT().notNull())).notNull()` cannot create a type like `ROWNOT NULL>NOT NULL`.
>- Idempotence for chained calls `notNull().nullable().notNull()` for
>`Row` cannot be maintained.
>
> Sergey and I have engaged in a discussion regarding the solution [1]. I'm
> interested in gathering additional perspectives on the fix.
>
> Look forward to your ideas!
>
> Best,
> Jane
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-31830
> [2] https://issues.apache.org/jira/browse/FLINK-31829
> [3] https://issues.apache.org/jira/browse/FLINK-33217
> [4] https://issues.apache.org/jira/browse/CALCITE-2464
>