Re: [DISCUSS] FLIP-398: Improve Serialization Configuration And Usage In Flink
Hi devs, Thanks for all the feedback. If there are no more comments, I would like to start a vote for this FLIP, thanks again! Best, Fang Yong On Wed, Dec 20, 2023 at 9:12 PM Yong Fang wrote: > Hi Ken, > > Thanks for your feedback. The purpose of this FLIP is to improve the use > of serialization, including configurable serializer for users, providing > serializer for composite data types, and resolving the default enabling of > Kryo, etc. Introducing a better serialization framework would be a great > help for Flink's performance, and it's great to see your tests on Fury. > However, as @Xintong mentioned, this could be a huge work and beyond the > scope of this FLIP. If you're interested, I think we could create a new > FLIP for it and discuss it further. What do you think? Thanks. > > Best, > Fang Yong > > On Mon, Dec 18, 2023 at 11:16 AM Xintong Song > wrote: > >> Hi Ken, >> >> I think the main purpose of this FLIP is to change how users interact with >> the knobs for customizing the serialization behaviors, from requiring code >> changes to working with pure configurations. Redesigning the knobs (i.e., >> names, semantics, etc.), on the other hand, is not the purpose of this >> FLIP. Preserving the existing names and semantics should also help >> minimize >> the migration cost for existing users. Therefore, I'm in favor of not >> changing them. >> >> Concerning decoupling from Kryo, and introducing other serialization >> frameworks like Fury, I think that's a bigger topic that is worth further >> discussion. At the moment, I'm not aware of any community consensus on >> doing so. And even if in the future we decide to do so, the changes needed >> should be the same w/ or w/o this FLIP. So I'd suggest not to block this >> FLIP on these issues. >> >> WDYT? >> >> Best, >> >> Xintong >> >> >> >> On Fri, Dec 15, 2023 at 1:40 AM Ken Krugler >> wrote: >> >> > Hi Yong, >> > >> > Looks good, thanks for creating this. >> > >> > One comment - related to my recent email about Fury, I would love to see >> > the v2 serialization decoupled from Kryo. >> > >> > As part of that, instead of using xxxKryo in methods, call them >> xxxGeneric. >> > >> > A more extreme change would be to totally rely on Fury (so no more POJO >> > serializer). Fury is faster than the POJO serializer in my tests, but >> this >> > would be a much bigger change. >> > >> > Though it could dramatically simplify the Flink serialization support. >> > >> > — Ken >> > >> > PS - a separate issue is how to migrate state from Kryo to something >> like >> > Fury, which supports schema evolution. I think this might be possible, >> by >> > having a smarter deserializer that identifies state as being created by >> > Kryo, and using (shaded) Kryo to deserialize, while still writing as >> Fury. >> > >> > > On Dec 6, 2023, at 6:35 PM, Yong Fang wrote: >> > > >> > > Hi devs, >> > > >> > > I'd like to start a discussion about FLIP-398: Improve Serialization >> > > Configuration And Usage In Flink [1]. >> > > >> > > Currently, users can register custom data types and serializers in >> Flink >> > > jobs through various methods, including registration in code, >> > > configuration, and annotations. These lead to difficulties in >> upgrading >> > > Flink jobs and priority issues. >> > > >> > > In flink-2.0 we would like to manage job data types and serializers >> > through >> > > configurations. This FLIP will introduce a unified option for data >> type >> > and >> > > serializer and users can configure all custom data types and >> > > pojo/kryo/custom serializers. In addition, this FLIP will add more >> > built-in >> > > serializers for complex data types such as List and Map, and optimize >> the >> > > management of Avro Serializers. >> > > >> > > Looking forward to hearing from you, thanks! >> > > >> > > [1] >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink >> > > >> > > Best, >> > > Fang Yong >> > >> > -- >> > Ken Krugler >> > http://www.scaleunlimited.com >> > Custom big data solutions >> > Flink & Pinot >> > >> > >> > >> > >> >
[jira] [Created] (FLINK-33935) Improve the default value doc and logic for some state backend and checkpoint related options
Rui Fan created FLINK-33935: --- Summary: Improve the default value doc and logic for some state backend and checkpoint related options Key: FLINK-33935 URL: https://issues.apache.org/jira/browse/FLINK-33935 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 Some state backend and checkpoint related options don't set the default value directly, but but they implement default value based on code. Such as: * execution.checkpointing.tolerable-failed-checkpoints ** [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints] * state.backend.type ** [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-backend-type] h2. Option1 execution.checkpointing.tolerable-failed-checkpoints doesn't have default value, but CheckpointConfig#getTolerableCheckpointFailureNumber calls {color:#9876aa}configuration{color}.getOptional(ExecutionCheckpointingOptions.{color:#9876aa}TOLERABLE_FAILURE_NUMBER{color}).orElse({color:#6897bb}0{color}). It means the 0 is default value of execution.checkpointing.tolerable-failed-checkpoints. h2. Option2 state.backend.type does't have default value, but StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal calls loadStateBackendFromConfig(config{color:#cc7832}, {color}classLoader{color:#cc7832}, {color}logger). When the return value is null, Flink will consider the hashmap as the default state backend. I checked all callers of StateBackendLoader#loadStateBackendFromConfig, if we change the default value of state.backend.type to hashmap. All of them work well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
Cai Liuyang created FLINK-33934: --- Summary: Flink SQL Source use raw format maybe lead to data lost Key: FLINK-33934 URL: https://issues.apache.org/jira/browse/FLINK-33934 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime Reporter: Cai Liuyang In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re:Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI
Hi, Jane, thanks for driving this. IMO, it is important to keep same consistent semantics between table api and sql, not only for maintenance, but also for user experience. But for users, the impact of this modification is a bit large. Is it possible to consider introducing a deprecated option to allow users to fall back to the previous version (default fallback), and then officially deprecate it in Flink 2.0? BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle the nullable attribute of the Row type in the way Calcite expected. However, fixing them will also cause a relatively large impact. We may also need to check the code part in SQL. [1] https://issues.apache.org/jira/browse/FLINK-33217 -- Best! Xuyang 在 2023-12-25 10:16:28,"Shengkai Fang" 写道: >Thanks for Jane and Sergey's proposal! > >+1 to correct the Table API behavior. > >I have one question: Is the influence only limited to the RowType? Does the >Map or Array type have the same problems? > >Best, >Shengkai >[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL >and TableA > >Jane Chan 于2023年12月22日周五 17:40写道: > >> Dear devs, >> >> Several issues [1][2][3] have been identified regarding the inconsistent >> treatment of ROW type nullability between SQL and TableAPI. However, >> addressing these discrepancies might necessitate updates to the public API. >> Therefore, I'm initiating this discussion to engage the community in >> forging a unified approach to resolve these challenges. >> >> To summarize, SQL prohibits ROW types such as ROW> STRING>, which is implicitly rewritten to ROW by >> Calcite[4]. In contrast, TableAPI permits such types, resulting in >> inconsistency. >> [image: image.png] >> For a comprehensive issue breakdown, please refer to the comment of [1]. >> >> According to CALCITE-2464[4], ROW is not a valid type. As >> a result, the behavior of TableAPI is incorrect and needs to be consistent >> with SQL, which means the implantation for the following public API needs >> to be changed. >> >>- RowType#copy(boolean nullable) should also set the inner fields to >>null if nullable is true. >>- RowType's constructor should also check nullability. >>- FieldsDataType#nullable() should also set the inner fields to null. >> >> In addition to the necessary changes in the implementation of the >> aforementioned API, the semantics of the following API have also been >> impacted. >> >>- `DataTypes.ROW(DataTypes.FIELD("f0", >>DataTypes.INT().notNull())).notNull()` cannot create a type like `ROW>NOT NULL>NOT NULL`. >>- Idempotence for chained calls `notNull().nullable().notNull()` for >>`Row` cannot be maintained. >> >> Sergey and I have engaged in a discussion regarding the solution [1]. I'm >> interested in gathering additional perspectives on the fix. >> >> Look forward to your ideas! >> >> Best, >> Jane >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-31830 >> [2] https://issues.apache.org/jira/browse/FLINK-31829 >> [3] https://issues.apache.org/jira/browse/FLINK-33217 >> [4] https://issues.apache.org/jira/browse/CALCITE-2464 >>
[jira] [Created] (FLINK-33933) SerializedThrowable will be java.lang.StackOverflowError when AsyncLookupFunction throw an exception
KarlManong created FLINK-33933: -- Summary: SerializedThrowable will be java.lang.StackOverflowError when AsyncLookupFunction throw an exception Key: FLINK-33933 URL: https://issues.apache.org/jira/browse/FLINK-33933 Project: Flink Issue Type: Bug Environment: tested from 1.16 to 1.18 , the same behavior Reporter: KarlManong Here is a simple example {code:java} // example public class TableA implements LookupTableSource { @Nullable private final LookupCache cache; public TableA(@Nullable LookupCache cache) { this.cache = cache; } @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { FunctionA lookupFunction = new FunctionA(); if (cache != null) { return PartialCachingAsyncLookupProvider.of(lookupFunction, cache); } else { return AsyncLookupFunctionProvider.of(lookupFunction); } } @Override public DynamicTableSource copy() { return new TableA(cache); } @Override public String asSummaryString() { return "Async Table"; } } public class LookupFunctionA extends AsyncLookupFunction { @Override public CompletableFuture> asyncLookup(RowData keyRow) { CompletableFuture> future = new CompletableFuture<>(); future.completeExceptionally(new IOException("request failed")); return future; } } {code} When using TableA, StackOverflowError occurs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33932) Support retry mechanism for rocksdb uploader
Guojun Li created FLINK-33932: - Summary: Support retry mechanism for rocksdb uploader Key: FLINK-33932 URL: https://issues.apache.org/jira/browse/FLINK-33932 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Guojun Li Rocksdb uploader will throw exception and decline the current checkpoint if there are errors when uploading to remote file system like hdfs. The exception is as below: 2023-12-19 08:46:00,197 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 2 by task 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job a025f19e at application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> Calc[133] (184/500)#0. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] ... 4 more Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) ~[?:?] at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at
Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI
Thanks for Jane and Sergey's proposal! +1 to correct the Table API behavior. I have one question: Is the influence only limited to the RowType? Does the Map or Array type have the same problems? Best, Shengkai [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableA Jane Chan 于2023年12月22日周五 17:40写道: > Dear devs, > > Several issues [1][2][3] have been identified regarding the inconsistent > treatment of ROW type nullability between SQL and TableAPI. However, > addressing these discrepancies might necessitate updates to the public API. > Therefore, I'm initiating this discussion to engage the community in > forging a unified approach to resolve these challenges. > > To summarize, SQL prohibits ROW types such as ROW STRING>, which is implicitly rewritten to ROW by > Calcite[4]. In contrast, TableAPI permits such types, resulting in > inconsistency. > [image: image.png] > For a comprehensive issue breakdown, please refer to the comment of [1]. > > According to CALCITE-2464[4], ROW is not a valid type. As > a result, the behavior of TableAPI is incorrect and needs to be consistent > with SQL, which means the implantation for the following public API needs > to be changed. > >- RowType#copy(boolean nullable) should also set the inner fields to >null if nullable is true. >- RowType's constructor should also check nullability. >- FieldsDataType#nullable() should also set the inner fields to null. > > In addition to the necessary changes in the implementation of the > aforementioned API, the semantics of the following API have also been > impacted. > >- `DataTypes.ROW(DataTypes.FIELD("f0", >DataTypes.INT().notNull())).notNull()` cannot create a type like `ROWNOT NULL>NOT NULL`. >- Idempotence for chained calls `notNull().nullable().notNull()` for >`Row` cannot be maintained. > > Sergey and I have engaged in a discussion regarding the solution [1]. I'm > interested in gathering additional perspectives on the fix. > > Look forward to your ideas! > > Best, > Jane > > > [1] https://issues.apache.org/jira/browse/FLINK-31830 > [2] https://issues.apache.org/jira/browse/FLINK-31829 > [3] https://issues.apache.org/jira/browse/FLINK-33217 > [4] https://issues.apache.org/jira/browse/CALCITE-2464 >