[flink] branch master updated (ade583cf804 -> e9353319ad6)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from ade583cf804 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer new 798a20a2c94 [FLINK-32978][flink-core] Deprecate RichFunction#open(Configuration parameters) new e9353319ad6 [FLINK-32978][flink-core] Migrate the usage of RichFunction#open(Configuration parameters) to RichFunction#open(OpenContext openContext) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../docs/dev/datastream/operators/asyncio.md | 2 +- .../dev/datastream/operators/process_function.md | 2 +- docs/content.zh/docs/dev/table/data_stream_api.md | 2 +- docs/content.zh/docs/dev/table/sourcesSinks.md | 2 +- docs/content.zh/docs/libs/state_processor_api.md | 6 +-- docs/content.zh/docs/try-flink/datastream.md | 6 +-- .../docs/dev/datastream/operators/asyncio.md | 2 +- .../dev/datastream/operators/process_function.md | 2 +- docs/content/docs/dev/table/data_stream_api.md | 2 +- docs/content/docs/dev/table/sourcesSinks.md| 2 +- docs/content/docs/libs/state_processor_api.md | 6 +-- docs/content/docs/try-flink/datastream.md | 6 +-- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 8 +-- .../source/reader/CoordinatedSourceITCase.java | 4 +- .../batch/compact/BatchPartitionCommitterSink.java | 4 +- .../compact/BatchPartitionCommitterSinkTest.java | 4 +- .../hive/factories/TestLockTableSinkFactory.java | 4 +- .../mapred/HadoopMapFunction.java | 8 +-- .../mapred/HadoopReduceCombineFunction.java| 8 +-- .../mapred/HadoopReduceFunction.java | 8 +-- .../flink/api/common/aggregators/Aggregator.java | 2 +- .../api/common/functions/AbstractRichFunction.java | 4 +- .../api/common/functions/DefaultOpenContext.java | 10 ++-- .../flink/api/common/functions/OpenContext.java| 12 ++--- .../common/functions/RichAggregateFunction.java| 3 +- .../api/common/functions/RichCoGroupFunction.java | 3 +- .../api/common/functions/RichCrossFunction.java| 3 +- .../api/common/functions/RichFilterFunction.java | 3 +- .../api/common/functions/RichFlatJoinFunction.java | 3 +- .../api/common/functions/RichFlatMapFunction.java | 3 +- .../flink/api/common/functions/RichFunction.java | 55 +++ .../common/functions/RichGroupCombineFunction.java | 3 +- .../common/functions/RichGroupReduceFunction.java | 3 +- .../api/common/functions/RichJoinFunction.java | 3 +- .../api/common/functions/RichMapFunction.java | 3 +- .../common/functions/RichMapPartitionFunction.java | 3 +- .../api/common/functions/RichReduceFunction.java | 3 +- .../api/common/functions/util/FunctionUtils.java | 6 +-- .../common/operators/base/BulkIterationBase.java | 4 +- .../common/operators/base/CoGroupOperatorBase.java | 3 +- .../operators/base/CoGroupRawOperatorBase.java | 3 +- .../common/operators/base/CrossOperatorBase.java | 3 +- .../common/operators/base/FilterOperatorBase.java | 3 +- .../common/operators/base/FlatMapOperatorBase.java | 3 +- .../operators/base/GroupCombineOperatorBase.java | 3 +- .../operators/base/GroupReduceOperatorBase.java| 3 +- .../operators/base/InnerJoinOperatorBase.java | 3 +- .../api/common/operators/base/MapOperatorBase.java | 3 +- .../operators/base/MapPartitionOperatorBase.java | 3 +- .../operators/base/OuterJoinOperatorBase.java | 3 +- .../common/operators/base/ReduceOperatorBase.java | 3 +- .../base/FlatMapOperatorCollectionTest.java| 4 +- .../operators/base/InnerJoinOperatorBaseTest.java | 4 +- .../api/common/operators/base/MapOperatorTest.java | 4 +- .../operators/base/OuterJoinOperatorBaseTest.java | 4 +- .../operators/base/PartitionMapOperatorTest.java | 4 +- .../api/java/typeutils/TypeExtractorTest.java | 4 +- .../tests/BlockingIncrementingMapFunction.java | 4 +- .../streaming/tests/SemanticsCheckMapper.java | 4 +- .../streaming/tests/SlidingWindowCheckMapper.java | 4 +- .../NettyShuffleMemoryControlTestProgram.java | 4 +- .../tests/queryablestate/QsStateProducer.java | 8 +-- .../RocksDBStateMemoryControlTestProgram.java | 8 +-- .../apache/flink/test/StatefulStreamingJob.java| 6 +-- .../flink/examples/java/clustering/KMeans.java | 4 +- .../apache/flink/examples/java/distcp/DistCp.java | 4 +- .../relational/EmptyFieldsCountAccumulator.java| 6 +-- .../streaming/examples/async/AsyncIOExample.java | 4 +- .../streaming/examples/gpu/MatrixVectorMul.java| 4
[flink] 01/02: [FLINK-32978][flink-core] Deprecate RichFunction#open(Configuration parameters)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 798a20a2c948db976ff7ef9e24f92191834bd921 Author: Wencong Liu AuthorDate: Fri Sep 8 11:04:27 2023 +0800 [FLINK-32978][flink-core] Deprecate RichFunction#open(Configuration parameters) --- .../api/common/functions/DefaultOpenContext.java | 28 +++ .../flink/api/common/functions/OpenContext.java| 29 .../flink/api/common/functions/RichFunction.java | 55 ++ 3 files changed, 112 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java new file mode 100644 index 000..21fca4e4c31 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** The default implementation of {@link OpenContext}. */ +@PublicEvolving +public class DefaultOpenContext implements OpenContext { + +public static final OpenContext INSTANCE = new DefaultOpenContext(); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java new file mode 100644 index 000..4ff5484b3b0 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The {@link OpenContext} interface provides necessary information required by the {@link + * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be + * used to add more methods without affecting the signature of {@code RichFunction#open}. + */ +@PublicEvolving +public interface OpenContext {} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java index 240b228557f..ae83fb30f2b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.functions; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; /** @@ -61,9 +62,63 @@ public interface RichFunction extends Function { * When the runtime catches an exception, it aborts the task and lets the fail-over logic * decide whether to retry the task execution. * @see org.apache.flink.configuration.Configuration + * @deprecated This method is deprecated since Flink 1.19. The users are recommended to + * implement {@code open(OpenContext openContext)} and implement {@code open(Configuration + * parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext + * openContext)}, the {@code
[flink] 02/02: [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit d100ab65367fe0b3d74a9901bcaaa26049c996b0 Author: Yuxin Tan AuthorDate: Tue Aug 22 15:34:52 2023 +0800 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer This closes #23255 --- .../network/partition/BufferReaderWriterUtil.java | 2 +- .../partition/hybrid/HsFileDataIndexImpl.java | 7 +- .../hybrid/index/FileDataIndexRegionHelper.java| 5 +- .../hybrid/index/FileRegionWriteReadUtils.java | 9 +- .../hybrid/tiered/file/PartitionFileReader.java| 79 - .../file/ProducerMergedPartitionFileIndex.java | 36 +- .../file/ProducerMergedPartitionFileReader.java| 383 ++--- .../file/ProducerMergedPartitionFileWriter.java| 3 +- .../tiered/file/SegmentPartitionFileReader.java| 29 +- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 98 -- .../tier/remote/RemoteTierConsumerAgent.java | 17 +- .../partition/hybrid/HybridShuffleTestUtils.java | 9 +- .../hybrid/index/FileDataIndexCacheTest.java | 2 +- .../hybrid/index/FileRegionWriteReadUtilsTest.java | 9 +- .../hybrid/index/TestingFileDataIndexRegion.java | 22 +- .../hybrid/tiered/file/DiskIOSchedulerTest.java| 8 +- .../file/ProducerMergedPartitionFileIndexTest.java | 2 +- .../ProducerMergedPartitionFileReaderTest.java | 132 --- .../file/SegmentPartitionFileReaderTest.java | 25 +- .../tiered/file/TestingPartitionFileReader.java| 21 +- 20 files changed, 642 insertions(+), 256 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java index 863be5b5b44..f89994c2d44 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java @@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil { } } -static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { +public static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { // the post-checked loop here gets away with one less check in the normal case do { if (channel.read(b) == -1) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index 4b4221225cf..25012ffa8f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { } @Override -public long getRegionFileOffset() { +public long getRegionStartOffset() { return regionFileOffset; } +@Override +public long getRegionEndOffset() { +throw new UnsupportedOperationException("This method is not supported."); +} + @Override public int getNumBuffers() { return numBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java index c0d4270e109..77638bb3156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java @@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelperNote that the implementation of the interface should strongly bind with the implementation + * of {@link PartitionFileReader}. + */ +interface ReadProgress {} + +/** + * A wrapper class of the reading buffer result, including the read buffers, the hint of + * continue reading, and the read progress, etc. + */ +class ReadBufferResult { + +/** The read buffers. */ +private final List readBuffers; + +/** + * A hint to determine whether the caller may continue reading the following buffers. Note + * that this hint is merely a recommendation and not obligatory. Following the hint while + * reading buffers may improve performance. + */ +
[flink] 01/02: [hotfix][network] Fix the close method for the tiered producer client
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit ae647e2b489f22a32b25d3625e5a7d2090e85f06 Author: Yuxin Tan AuthorDate: Mon Aug 14 19:45:34 2023 +0800 [hotfix][network] Fix the close method for the tiered producer client --- .../network/partition/hybrid/tiered/shuffle/TieredResultPartition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java index d3aae631620..f7070165fa1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java @@ -180,13 +180,13 @@ public class TieredResultPartition extends ResultPartition { @Override public void finish() throws IOException { broadcastEvent(EndOfPartitionEvent.INSTANCE, false); +tieredStorageProducerClient.close(); checkState(!isReleased(), "Result partition is already released."); super.finish(); } @Override public void close() { -tieredStorageProducerClient.close(); super.close(); }
[flink] branch release-1.18 updated (f2584a1df36 -> d100ab65367)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from f2584a1df36 [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser (#23388) new ae647e2b489 [hotfix][network] Fix the close method for the tiered producer client new d100ab65367 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../network/partition/BufferReaderWriterUtil.java | 2 +- .../partition/hybrid/HsFileDataIndexImpl.java | 7 +- .../hybrid/index/FileDataIndexRegionHelper.java| 5 +- .../hybrid/index/FileRegionWriteReadUtils.java | 9 +- .../hybrid/tiered/file/PartitionFileReader.java| 79 - .../file/ProducerMergedPartitionFileIndex.java | 36 +- .../file/ProducerMergedPartitionFileReader.java| 383 ++--- .../file/ProducerMergedPartitionFileWriter.java| 3 +- .../tiered/file/SegmentPartitionFileReader.java| 29 +- .../tiered/shuffle/TieredResultPartition.java | 2 +- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 98 -- .../tier/remote/RemoteTierConsumerAgent.java | 17 +- .../partition/hybrid/HybridShuffleTestUtils.java | 9 +- .../hybrid/index/FileDataIndexCacheTest.java | 2 +- .../hybrid/index/FileRegionWriteReadUtilsTest.java | 9 +- .../hybrid/index/TestingFileDataIndexRegion.java | 22 +- .../hybrid/tiered/file/DiskIOSchedulerTest.java| 8 +- .../file/ProducerMergedPartitionFileIndexTest.java | 2 +- .../ProducerMergedPartitionFileReaderTest.java | 132 --- .../file/SegmentPartitionFileReaderTest.java | 25 +- .../tiered/file/TestingPartitionFileReader.java| 21 +- 21 files changed, 643 insertions(+), 257 deletions(-)
[flink] 01/02: [hotfix][network] Fix the close method for the tiered producer client
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e41b0b2676166de582c4dd9f0289c585c31aca3b Author: Yuxin Tan AuthorDate: Mon Aug 14 19:45:34 2023 +0800 [hotfix][network] Fix the close method for the tiered producer client --- .../network/partition/hybrid/tiered/shuffle/TieredResultPartition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java index d3aae631620..f7070165fa1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java @@ -180,13 +180,13 @@ public class TieredResultPartition extends ResultPartition { @Override public void finish() throws IOException { broadcastEvent(EndOfPartitionEvent.INSTANCE, false); +tieredStorageProducerClient.close(); checkState(!isReleased(), "Result partition is already released."); super.finish(); } @Override public void close() { -tieredStorageProducerClient.close(); super.close(); }
[flink] branch master updated (649b7fe197c -> ade583cf804)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 649b7fe197c [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3 new e41b0b26761 [hotfix][network] Fix the close method for the tiered producer client new ade583cf804 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../network/partition/BufferReaderWriterUtil.java | 2 +- .../partition/hybrid/HsFileDataIndexImpl.java | 7 +- .../hybrid/index/FileDataIndexRegionHelper.java| 5 +- .../hybrid/index/FileRegionWriteReadUtils.java | 9 +- .../hybrid/tiered/file/PartitionFileReader.java| 79 - .../file/ProducerMergedPartitionFileIndex.java | 36 +- .../file/ProducerMergedPartitionFileReader.java| 383 ++--- .../file/ProducerMergedPartitionFileWriter.java| 3 +- .../tiered/file/SegmentPartitionFileReader.java| 29 +- .../tiered/shuffle/TieredResultPartition.java | 2 +- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 98 -- .../tier/remote/RemoteTierConsumerAgent.java | 17 +- .../partition/hybrid/HybridShuffleTestUtils.java | 9 +- .../hybrid/index/FileDataIndexCacheTest.java | 2 +- .../hybrid/index/FileRegionWriteReadUtilsTest.java | 9 +- .../hybrid/index/TestingFileDataIndexRegion.java | 22 +- .../hybrid/tiered/file/DiskIOSchedulerTest.java| 8 +- .../file/ProducerMergedPartitionFileIndexTest.java | 2 +- .../ProducerMergedPartitionFileReaderTest.java | 132 --- .../file/SegmentPartitionFileReaderTest.java | 25 +- .../tiered/file/TestingPartitionFileReader.java| 21 +- 21 files changed, 643 insertions(+), 257 deletions(-)
[flink] 02/02: [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ade583cf80478ac60e9b83fc0a97f36bc2b26f1c Author: Yuxin Tan AuthorDate: Tue Aug 22 15:34:52 2023 +0800 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer This closes #23255 --- .../network/partition/BufferReaderWriterUtil.java | 2 +- .../partition/hybrid/HsFileDataIndexImpl.java | 7 +- .../hybrid/index/FileDataIndexRegionHelper.java| 5 +- .../hybrid/index/FileRegionWriteReadUtils.java | 9 +- .../hybrid/tiered/file/PartitionFileReader.java| 79 - .../file/ProducerMergedPartitionFileIndex.java | 36 +- .../file/ProducerMergedPartitionFileReader.java| 383 ++--- .../file/ProducerMergedPartitionFileWriter.java| 3 +- .../tiered/file/SegmentPartitionFileReader.java| 29 +- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 98 -- .../tier/remote/RemoteTierConsumerAgent.java | 17 +- .../partition/hybrid/HybridShuffleTestUtils.java | 9 +- .../hybrid/index/FileDataIndexCacheTest.java | 2 +- .../hybrid/index/FileRegionWriteReadUtilsTest.java | 9 +- .../hybrid/index/TestingFileDataIndexRegion.java | 22 +- .../hybrid/tiered/file/DiskIOSchedulerTest.java| 8 +- .../file/ProducerMergedPartitionFileIndexTest.java | 2 +- .../ProducerMergedPartitionFileReaderTest.java | 132 --- .../file/SegmentPartitionFileReaderTest.java | 25 +- .../tiered/file/TestingPartitionFileReader.java| 21 +- 20 files changed, 642 insertions(+), 256 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java index 863be5b5b44..f89994c2d44 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java @@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil { } } -static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { +public static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { // the post-checked loop here gets away with one less check in the normal case do { if (channel.read(b) == -1) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index 4b4221225cf..25012ffa8f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { } @Override -public long getRegionFileOffset() { +public long getRegionStartOffset() { return regionFileOffset; } +@Override +public long getRegionEndOffset() { +throw new UnsupportedOperationException("This method is not supported."); +} + @Override public int getNumBuffers() { return numBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java index c0d4270e109..77638bb3156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java @@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelperNote that the implementation of the interface should strongly bind with the implementation + * of {@link PartitionFileReader}. + */ +interface ReadProgress {} + +/** + * A wrapper class of the reading buffer result, including the read buffers, the hint of + * continue reading, and the read progress, etc. + */ +class ReadBufferResult { + +/** The read buffers. */ +private final List readBuffers; + +/** + * A hint to determine whether the caller may continue reading the following buffers. Note + * that this hint is merely a recommendation and not obligatory. Following the hint while + * reading buffers may improve performance. + */ +
[flink] branch release-1.18 updated: [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser (#23388)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new f2584a1df36 [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser (#23388) f2584a1df36 is described below commit f2584a1df364a14ff50b5a52fe7cf5e38d4cdc9a Author: yunhong <337361...@qq.com> AuthorDate: Tue Sep 12 11:08:48 2023 +0800 [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser (#23388) Co-authored-by: zhengyunhong.zyh --- .../plan/utils/JavaUserDefinedAggFunctions.java| 80 ++ .../runtime/stream/sql/AggregateITCase.scala | 30 +++- .../table/runtime/typeutils/TypeCheckUtils.java| 12 +++- 3 files changed, 120 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java index 64cb9bbd2bd..4e722be8ee3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java @@ -25,7 +25,11 @@ import org.apache.flink.table.api.dataview.ListView; import org.apache.flink.table.api.dataview.MapView; import org.apache.flink.table.functions.AggregateFunction; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; /** Test aggregator functions. */ public class JavaUserDefinedAggFunctions { @@ -421,4 +425,80 @@ public class JavaUserDefinedAggFunctions { return Tuple1.of(0L); } } + +/** User defined pojo object. */ +public static class TestObject { +private final String a; + +public TestObject(String a) { +this.a = a; +} + +public String getA() { +return a; +} +} + +/** User defined object. */ +public static class UserDefinedObject { +// List with user defined pojo object. +public List testObjectList = new ArrayList<>(); +// Map with user defined pojo object. +public Map testObjectMap = new HashMap<>(); +} + +/** User defined UDAF whose value and acc is user defined complex pojo object. */ +public static class UserDefinedObjectUDAF +extends AggregateFunction { +private static final String KEY = "key"; + +@Override +public UserDefinedObject getValue(UserDefinedObject accumulator) { +return accumulator; +} + +@Override +public UserDefinedObject createAccumulator() { +return new UserDefinedObject(); +} + +public void accumulate(UserDefinedObject acc, String a) { +if (a != null) { +acc.testObjectList.add(new TestObject(a)); +acc.testObjectMap.put(KEY, new TestObject(a)); +} +} + +public void retract(UserDefinedObject acc, UserDefinedObject a) { +// do nothing. +} +} + +/** User defined UDAF whose value and acc is user defined complex pojo object. */ +public static class UserDefinedObjectUDAF2 +extends AggregateFunction { +private static final String KEY = "key"; + +@Override +public String getValue(UserDefinedObject accumulator) { +if (accumulator.testObjectMap.containsKey(KEY)) { +return accumulator.testObjectMap.get(KEY).getA(); +} +return null; +} + +@Override +public UserDefinedObject createAccumulator() { +return new UserDefinedObject(); +} + +public void accumulate(UserDefinedObject acc, UserDefinedObject a) { +acc.testObjectList = a.testObjectList; +acc.testObjectMap = a.testObjectMap; +} + +public void retract(UserDefinedObject acc, UserDefinedObject a) { +// do nothing +} +} } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 152c8d02be4..4df20e1615d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++
[flink] branch release-1.18 updated: [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new f66679bfe5f [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338) f66679bfe5f is described below commit f66679bfe5f5b344eec71a7579504762cc3c04ae Author: yunhong <337361...@qq.com> AuthorDate: Tue Sep 12 11:07:23 2023 +0800 [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338) Co-authored-by: zhengyunhong.zyh --- .../flink/table/planner/plan/reuse/ScanReuser.java | 16 +- .../table/planner/plan/reuse/ScanReuserUtils.java | 12 +- .../table/planner/plan/optimize/ScanReuseTest.java | 76 .../table/planner/plan/optimize/ScanReuseTest.xml | 214 +++-- 4 files changed, 243 insertions(+), 75 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 1ef3bf2f2b1..9128e110346 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -170,12 +170,18 @@ public class ScanReuser { // 2. Create new source. List specs = abilitySpecsWithoutEscaped(pickTable); -// 2.1 Apply projections -List newSpecs = new ArrayList<>(); +// 2.1 Create produced type. +// The source produced type is the input type into the runtime. The format looks as: +// PHYSICAL COLUMNS + METADATA COLUMNS. While re-compute the source ability specs with +// source metadata, we need to distinguish between schema type and produced type, which +// source ability specs use produced type instead of schema type. RowType originType = DynamicSourceUtils.createProducedType( pickTable.contextResolvedTable().getResolvedSchema(), pickTable.tableSource()); + +// 2.2 Apply projections +List newSpecs = new ArrayList<>(); RowType newSourceType = applyPhysicalAndMetadataPushDown( pickTable.tableSource(), @@ -190,15 +196,15 @@ public class ScanReuser { allMetaKeys); specs.addAll(newSpecs); -// 2.2 Watermark spec +// 2.3 Watermark spec Optional watermarkSpec = -getAdjustedWatermarkSpec(pickTable, newSourceType); +getAdjustedWatermarkSpec(pickTable, originType, newSourceType); if (watermarkSpec.isPresent()) { specs.add(watermarkSpec.get()); newSourceType = watermarkSpec.get().getProducedType().get(); } -// 2.3 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. +// 2.4 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. DynamicTableSourceSpec tableSourceSpec = new DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs); ScanTableSource newTableSource = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java index 5c79cd4cd25..43a00d720a2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java @@ -170,24 +170,18 @@ public class ScanReuserUtils { /** Watermark push down must be after projection push down, so we need to adjust its index. */ public static Optional getAdjustedWatermarkSpec( -TableSourceTable table, RowType newSourceType) { -RowType producedType = -(RowType) -table.contextResolvedTable() -.getResolvedSchema() -.toSourceRowDataType() -.getLogicalType(); +TableSourceTable table, RowType oldSourceType, RowType newSourceType) { for (SourceAbilitySpec spec : table.abilitySpecs()) { if (spec instanceof WatermarkPushDownSpec) { return Optional.of( adjustWatermarkIndex(
[flink] branch master updated: [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 649b7fe197c [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3 649b7fe197c is described below commit 649b7fe197c8b03cce9595adcfea33c8d708a8b4 Author: Shengkai <1059623...@qq.com> AuthorDate: Thu Sep 7 17:55:31 2023 +0800 [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3 This closes #23370 --- .../org/apache/flink/tests/hive/HiveITCase.java| 6 +- .../flink/tests/hive/containers/HiveContainer.java | 188 .../tests/hive/containers/HiveContainers.java | 246 + 3 files changed, 249 insertions(+), 191 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java index 5b310688f89..24a759887c5 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.test.util.SQLJobSubmission; -import org.apache.flink.tests.hive.containers.HiveContainer; +import org.apache.flink.tests.hive.containers.HiveContainers; import org.apache.flink.tests.util.flink.ClusterController; import org.apache.flink.tests.util.flink.FlinkResource; import org.apache.flink.tests.util.flink.FlinkResourceSetup; @@ -69,8 +69,8 @@ public class HiveITCase extends TestLogger { @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); @ClassRule -public static final HiveContainer HIVE_CONTAINER = -new HiveContainer( +public static final HiveContainers.HiveContainer HIVE_CONTAINER = +HiveContainers.createHiveContainer( Arrays.asList("hive_sink1", "hive_sink2", "h_table_sink1", "h_table_sink2")); private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc"; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java deleted file mode 100644 index d4ca4080023..000 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.hive.containers; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.test.parameters.ParameterProperty; -import org.apache.flink.util.DockerImageVersions; - -import com.github.dockerjava.api.command.InspectContainerResponse; -import okhttp3.FormBody; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.utility.DockerImageName; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.UUID; - -/** Test container for Hive. */ -public class HiveContainer extends GenericContainer { - -private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); -public static final String HOST_NAME = "hadoop-master"; -public static final int HIVE_METASTORE_PORT = 9083; - -private static final boolean HIVE_310_OR_LATER = -
[flink-connector-elasticsearch] branch main updated: [FLINK-29042][Connectors/ElasticSearch] Support lookup join for es connector (#39)
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git The following commit(s) were added to refs/heads/main by this push: new fef5c96 [FLINK-29042][Connectors/ElasticSearch] Support lookup join for es connector (#39) fef5c96 is described below commit fef5c964f4a2fc8b61c482b0bc8504757581adfb Author: Yubin Li AuthorDate: Tue Sep 12 04:31:20 2023 +0800 [FLINK-29042][Connectors/ElasticSearch] Support lookup join for es connector (#39) --- flink-connector-elasticsearch-base/pom.xml | 8 + .../elasticsearch/ElasticsearchApiCallBridge.java | 17 ++ .../table/ElasticsearchRowDataLookupFunction.java | 183 + .../elasticsearch/ElasticsearchSinkBaseTest.java | 12 ++ .../pom.xml| 5 + .../tests/ElasticsearchLookupE2ECase.java | 126 ++ .../pom.xml| 18 ++ .../tests/Elasticsearch6LookupE2ECase.java | 44 + .../pom.xml| 18 ++ .../tests/Elasticsearch7LookupE2ECase.java | 43 + .../table/Elasticsearch6DynamicSource.java | 135 +++ ...java => Elasticsearch6DynamicTableFactory.java} | 74 - .../Elasticsearch6ApiCallBridge.java | 23 ++- .../org.apache.flink.table.factories.Factory | 2 +- ... => Elasticsearch6DynamicTableFactoryTest.java} | 2 +- .../table/Elasticsearch6DynamicSinkITCase.java | 5 +- ... => Elasticsearch6DynamicTableFactoryTest.java} | 40 ++--- .../table/Elasticsearch7DynamicSource.java | 135 +++ ...java => Elasticsearch7DynamicTableFactory.java} | 74 - .../Elasticsearch7ApiCallBridge.java | 23 ++- .../org.apache.flink.table.factories.Factory | 2 +- ... => Elasticsearch7DynamicTableFactoryTest.java} | 2 +- .../table/Elasticsearch7DynamicSinkITCase.java | 5 +- ... => Elasticsearch7DynamicTableFactoryTest.java} | 40 ++--- 24 files changed, 976 insertions(+), 60 deletions(-) diff --git a/flink-connector-elasticsearch-base/pom.xml b/flink-connector-elasticsearch-base/pom.xml index 9c65924..f859b1b 100644 --- a/flink-connector-elasticsearch-base/pom.xml +++ b/flink-connector-elasticsearch-base/pom.xml @@ -164,6 +164,14 @@ under the License. test + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + true + + diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index 8061d04..aab5cf5 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -19,9 +19,11 @@ package org.apache.flink.streaming.connectors.elasticsearch; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.search.SearchRequest; import javax.annotation.Nullable; @@ -61,6 +63,21 @@ public interface ElasticsearchApiCallBridge extends Ser */ BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener); +/** + * Executes a search using the Search API. + * + * @param client the Elasticsearch client. + * @param searchRequest A request to execute search against one or more indices (or all). + */ +Tuple2 search(C client, SearchRequest searchRequest) throws IOException; + +/** + * Closes this client and releases any system resources associated with it. + * + * @param client the Elasticsearch client. + */ +void close(C client) throws IOException; + /** * Extracts the cause of failure of a bulk item action. * diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java new file mode 100644 index 000..1cde271 --- /dev/null +++
[flink] branch master updated: [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832)
This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c33a6527d83 [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832) c33a6527d83 is described below commit c33a6527d8383dc571e0b648b8a29322416ab9d6 Author: Shammon FY AuthorDate: Mon Sep 11 20:34:34 2023 +0800 [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832) * [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway --- .../handler/statement/FetchResultsHandler.java | 19 ++- .../flink/table/gateway/rest/serde/ResultInfo.java | 14 +- .../rest/util/RowDataLocalTimeZoneConverter.java | 187 + .../utils/RowDataLocalTimeZoneConverterTest.java | 172 +++ .../apache/flink/table/jdbc/FlinkResultSet.java| 43 +++-- .../flink/table/jdbc/utils/ArrayFieldGetter.java | 121 - .../flink/table/jdbc/FlinkStatementTest.java | 52 -- 7 files changed, 464 insertions(+), 144 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java index 8cb4ff80802..4303158ea4e 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.table.gateway.rest.handler.statement; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils; @@ -39,13 +40,18 @@ import org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowForm import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter; import org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse; import org.apache.flink.table.gateway.rest.serde.ResultInfo; +import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter; import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import javax.annotation.Nonnull; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** Handler to fetch results. */ public class FetchResultsHandler @@ -100,13 +106,24 @@ public class FetchResultsHandler return CompletableFuture.completedFuture( new NotReadyFetchResultResponse(nextResultUri)); } else { +RowDataLocalTimeZoneConverter timeZoneConverter = null; +if (rowFormat == RowFormat.JSON) { +List logicalTypeList = + resultSet.getResultSchema().getColumnDataTypes().stream() +.map(DataType::getLogicalType) +.collect(Collectors.toList()); +timeZoneConverter = +new RowDataLocalTimeZoneConverter( +logicalTypeList, + Configuration.fromMap(service.getSessionConfig(sessionHandle))); +} return CompletableFuture.completedFuture( new FetchResultResponseBodyImpl( resultType, resultSet.isQueryResult(), resultSet.getJobID(), resultSet.getResultKind(), -ResultInfo.createResultInfo(resultSet, rowFormat), +ResultInfo.createResultInfo(resultSet, rowFormat, timeZoneConverter), nextResultUri)); } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java index c34fb84dced..31957caca43 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java @@ -26,6 +26,7 @@ import
[flink] 02/02: [FLINK-32979] Migrate the usage of getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger()
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 831cb8eff022db5543052c96716518c862c2 Author: Wencong Liu AuthorDate: Tue Aug 29 12:37:07 2023 +0800 [FLINK-32979] Migrate the usage of getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger() This closes #23073 --- .../org/apache/flink/state/api/WindowedOperatorTransformation.java | 2 +- .../org/apache/flink/state/api/WindowedStateTransformation.java| 2 +- .../apache/flink/streaming/api/datastream/AllWindowedStream.java | 2 +- .../org/apache/flink/streaming/api/datastream/WindowedStream.java | 2 +- .../api/windowing/assigners/DynamicEventTimeSessionWindows.java| 7 +++ .../windowing/assigners/DynamicProcessingTimeSessionWindows.java | 7 +++ .../streaming/api/windowing/assigners/EventTimeSessionWindows.java | 6 ++ .../flink/streaming/api/windowing/assigners/GlobalWindows.java | 6 ++ .../api/windowing/assigners/ProcessingTimeSessionWindows.java | 6 ++ .../streaming/api/windowing/assigners/SlidingEventTimeWindows.java | 6 ++ .../api/windowing/assigners/SlidingProcessingTimeWindows.java | 6 ++ .../api/windowing/assigners/TumblingEventTimeWindows.java | 6 ++ .../api/windowing/assigners/TumblingProcessingTimeWindows.java | 6 ++ .../operators/windowing/DynamicEventTimeSessionWindowsTest.java| 5 + .../windowing/DynamicProcessingTimeSessionWindowsTest.java | 5 + .../runtime/operators/windowing/EventTimeSessionWindowsTest.java | 5 + .../streaming/runtime/operators/windowing/GlobalWindowsTest.java | 5 + .../runtime/operators/windowing/MergingWindowSetTest.java | 6 ++ .../operators/windowing/ProcessingTimeSessionWindowsTest.java | 5 + .../runtime/operators/windowing/SlidingEventTimeWindowsTest.java | 5 + .../operators/windowing/SlidingProcessingTimeWindowsTest.java | 5 + .../runtime/operators/windowing/TumblingEventTimeWindowsTest.java | 5 + .../operators/windowing/TumblingProcessingTimeWindowsTest.java | 5 + 23 files changed, 75 insertions(+), 40 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java index f0fdc8cabd5..ed72d7eaf19 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java @@ -84,7 +84,7 @@ public class WindowedOperatorTransformation { this.builder = new WindowOperatorBuilder<>( windowAssigner, -windowAssigner.getDefaultTrigger(null), +windowAssigner.getDefaultTrigger(), input.getExecutionEnvironment().getConfig(), input.getType(), keySelector, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java index fc0f85812e8..5d42ba7811c 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java @@ -77,7 +77,7 @@ public class WindowedStateTransformation { this.builder = new WindowOperatorBuilder<>( windowAssigner, -windowAssigner.getDefaultTrigger(null), +windowAssigner.getDefaultTrigger(), input.getExecutionEnvironment().getConfig(), input.getType(), keySelector, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 33df648bab0..639d2af5f64 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -111,7 +111,7 @@ public class AllWindowedStream { public AllWindowedStream(DataStream input, WindowAssigner windowAssigner) { this.input = input.keyBy(new NullByteKeySelector()); this.windowAssigner = windowAssigner; -this.trigger
[flink] 01/02: [FLINK-32979] Add getDefaultTrigger() to WindowAssigner
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 29a6e06901d4880a2d115f74f3955762378b874d Author: Wencong Liu AuthorDate: Tue Aug 29 12:36:37 2023 +0800 [FLINK-32979] Add getDefaultTrigger() to WindowAssigner --- .../api/windowing/assigners/WindowAssigner.java| 29 +- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java index 43533357eab..0bc1b0a7d36 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java @@ -54,7 +54,34 @@ public abstract class WindowAssigner implements Serializabl public abstract Collection assignWindows( T element, long timestamp, WindowAssignerContext context); -/** Returns the default trigger associated with this {@code WindowAssigner}. */ +/** + * Returns the default trigger associated with this {@code WindowAssigner}. + * + * 1. If you override {@code getDefaultTrigger()}, the {@code getDefaultTrigger()} will be + * invoked and the {@code getDefaultTrigger(StreamExecutionEnvironment env)} won't be invoked. + * 2. If you don't override {@code getDefaultTrigger()}, the {@code + * getDefaultTrigger(StreamExecutionEnvironment env)} will be invoked in the default + * implementation of the {@code getDefaultTrigger()}. + */ +public Trigger getDefaultTrigger() { +return getDefaultTrigger(new StreamExecutionEnvironment()); +} + +/** + * Returns the default trigger associated with this {@code WindowAssigner}. + * + * @deprecated the method is deprecated since Flink 1.19 because {@code + * StreamExecutionEnvironment} is unused. Please use {@code getDefaultTrigger} and override + * this method with an empty body instead. 1. If you override {@code getDefaultTrigger()}, + * the {@code getDefaultTrigger()} will be invoked and the {@code + * getDefaultTrigger(StreamExecutionEnvironment env)} won't be invoked. 2. If you don't + * override {@code getDefaultTrigger()}, the {@code + * getDefaultTrigger(StreamExecutionEnvironment env)} will be invoked in the default + * implementation of the {@code getDefaultTrigger()}. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425229;> + * FLIP-343: Remove parameter in WindowAssigner#getDefaultTrigger() + */ +@Deprecated public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env); /**
[flink] branch master updated (484da993e22 -> 831cb8eff02)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 484da993e22 [FLINK-31889][docs] Add documentation for implementing/loading enrichers new 29a6e06901d [FLINK-32979] Add getDefaultTrigger() to WindowAssigner new 831cb8eff02 [FLINK-32979] Migrate the usage of getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger() The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../state/api/WindowedOperatorTransformation.java | 2 +- .../state/api/WindowedStateTransformation.java | 2 +- .../api/datastream/AllWindowedStream.java | 2 +- .../streaming/api/datastream/WindowedStream.java | 2 +- .../assigners/DynamicEventTimeSessionWindows.java | 7 ++ .../DynamicProcessingTimeSessionWindows.java | 7 ++ .../assigners/EventTimeSessionWindows.java | 6 + .../api/windowing/assigners/GlobalWindows.java | 6 + .../assigners/ProcessingTimeSessionWindows.java| 6 + .../assigners/SlidingEventTimeWindows.java | 6 + .../assigners/SlidingProcessingTimeWindows.java| 6 + .../assigners/TumblingEventTimeWindows.java| 6 + .../assigners/TumblingProcessingTimeWindows.java | 6 + .../api/windowing/assigners/WindowAssigner.java| 29 +- .../DynamicEventTimeSessionWindowsTest.java| 5 +--- .../DynamicProcessingTimeSessionWindowsTest.java | 5 +--- .../windowing/EventTimeSessionWindowsTest.java | 5 +--- .../operators/windowing/GlobalWindowsTest.java | 5 +--- .../operators/windowing/MergingWindowSetTest.java | 6 + .../ProcessingTimeSessionWindowsTest.java | 5 +--- .../windowing/SlidingEventTimeWindowsTest.java | 5 +--- .../SlidingProcessingTimeWindowsTest.java | 5 +--- .../windowing/TumblingEventTimeWindowsTest.java| 5 +--- .../TumblingProcessingTimeWindowsTest.java | 5 +--- 24 files changed, 103 insertions(+), 41 deletions(-)
[flink-connector-aws] branch main updated: [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git The following commit(s) were added to refs/heads/main by this push: new b996d1d [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer b996d1d is described below commit b996d1d854a221787cfbb46802b9f3c5bb0e6651 Author: Alexander Egorov AuthorDate: Mon Sep 11 06:10:27 2023 -0500 [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer --- .../connectors/kinesis/FlinkKinesisConsumer.java | 10 ++-- .../connectors/kinesis/util/KinesisStateUtil.java | 56 .../kinesis/FlinkKinesisConsumerTest.java | 61 -- .../testutils/TestableFlinkKinesisConsumer.java| 29 ++ 4 files changed, 120 insertions(+), 36 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 3647176..c229a1c 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -47,6 +46,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; @@ -441,16 +441,14 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction @Override public void initializeState(FunctionInitializationContext context) throws Exception { -TypeInformation> shardsStateTypeInfo = -new TupleTypeInfo<>( -TypeInformation.of(StreamShardMetadata.class), -TypeInformation.of(SequenceNumber.class)); sequenceNumsStateForCheckpoint = context.getOperatorStateStore() .getUnionListState( new ListStateDescriptor<>( -sequenceNumsStateStoreName, shardsStateTypeInfo)); +sequenceNumsStateStoreName, + KinesisStateUtil.createShardsStateSerializer( + getRuntimeContext().getExecutionConfig(; if (context.isRestored()) { if (sequenceNumsToRestore == null) { diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java new file mode 100644 index 000..eba4440 --- /dev/null +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
[flink] branch release-1.17 updated: [FLINK-32962][python] Remove pip version check on installing dependencies.
This is an automated email from the ASF dual-hosted git repository. hong pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new f786dbea0be [FLINK-32962][python] Remove pip version check on installing dependencies. f786dbea0be is described below commit f786dbea0be25cc62dc10311a8d64d9de2ee20dd Author: Aleksandr Pilipenko AuthorDate: Fri Aug 25 21:37:00 2023 +0100 [FLINK-32962][python] Remove pip version check on installing dependencies. --- .../python/util/PythonEnvironmentManagerUtils.java | 25 +++--- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java index 2aecd009e6d..b271c41a63f 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java @@ -58,12 +58,6 @@ public class PythonEnvironmentManagerUtils { + "import sysconfig;" + "print(sysconfig.get_path('platlib', vars={'base': sys.argv[1], 'platbase': sys.argv[1]}))"; -private static final String CHECK_PIP_VERSION_SCRIPT = -"import sys;" -+ "from pkg_resources import get_distribution, parse_version;" -+ "pip_version = get_distribution('pip').version;" -+ "print(parse_version(pip_version) >= parse_version(sys.argv[1]))"; - private static final String GET_RUNNER_DIR_SCRIPT = "import pyflink;" + "import os;" @@ -105,13 +99,9 @@ public class PythonEnvironmentManagerUtils { "install", "--ignore-installed", "-r", -requirementsFilePath)); -if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, environmentVariables)) { -commands.addAll(Arrays.asList("--prefix", requirementsInstallDir)); -} else { -commands.addAll( -Arrays.asList("--install-option", "--prefix=" + requirementsInstallDir)); -} +requirementsFilePath, +"--prefix", +requirementsInstallDir)); if (requirementsCacheDir != null) { commands.addAll(Arrays.asList("--no-index", "--find-links", requirementsCacheDir)); } @@ -173,15 +163,6 @@ public class PythonEnvironmentManagerUtils { return String.join(File.pathSeparator, out.trim().split("\n")); } -private static boolean isPipVersionGreaterEqual( -String pipVersion, String pythonExecutable, Map environmentVariables) -throws IOException { -String[] commands = -new String[] {pythonExecutable, "-c", CHECK_PIP_VERSION_SCRIPT, pipVersion}; -String out = execute(commands, environmentVariables, false); -return Boolean.parseBoolean(out.trim()); -} - private static String execute( String[] commands, Map environmentVariables, boolean logDetails) throws IOException {
[flink] branch master updated: [FLINK-31889][docs] Add documentation for implementing/loading enrichers
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 484da993e22 [FLINK-31889][docs] Add documentation for implementing/loading enrichers 484da993e22 is described below commit 484da993e22c30b0c9e2bcf747036a79443519fc Author: Panagiotis Garefalakis AuthorDate: Sun Apr 23 19:07:20 2023 -0700 [FLINK-31889][docs] Add documentation for implementing/loading enrichers --- .../docs/deployment/advanced/failure_enrichers.md | 110 + .../docs/deployment/advanced/failure_enrichers.md | 110 + 2 files changed, 220 insertions(+) diff --git a/docs/content.zh/docs/deployment/advanced/failure_enrichers.md b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md new file mode 100644 index 000..0bbf45efd68 --- /dev/null +++ b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md @@ -0,0 +1,110 @@ +--- +title: "Failure Enrichers" +nav-title: failure-enrichers +nav-parent_id: advanced +nav-pos: 3 +--- + + +## Custom failure enrichers +Flink provides a pluggable interface for users to register their custom logic and enrich failures with extra metadata labels (string key-value pairs). +This enables users to implement their own failure enrichment plugins to categorize job failures, expose custom metrics, or make calls to external notification systems. + +FailureEnrichers are triggered every time an exception is reported at runtime by the JobManager. +Every FailureEnricher may asynchronously return labels associated with the failure that are then exposed via the JobManager's REST API (e.g., a 'type:System' label implying the failure is categorized as a system error). + + +### Implement a plugin for your custom enricher + +To implement a custom FailureEnricher plugin, you need to: + +- Add your own FailureEnricher by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java" name="FailureEnricher" >}} interface. + +- Add your own FailureEnricherFactory by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java" name="FailureEnricherFactory" >}} interface. + +- Add a service entry. Create a file `META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which contains the class name of your failure enricher factory class (see [Java Service Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html) docs for more details). + + +Then, create a jar which includes your `FailureEnricher`, `FailureEnricherFactory`, `META-INF/services/` and all external dependencies. +Make a directory in `plugins/` of your Flink distribution with an arbitrary name, e.g. "failure-enrichment", and put the jar into this directory. +See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more details. + +{{< hint warning >}} +Note that every FailureEnricher should have defined a set of {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java" name="output keys" >}} that may be associated with values. This set of keys has to be unique otherwise all enrichers with overlapping keys will be ignored. +{{< /hint >}} + +FailureEnricherFactory example: + +``` java +package org.apache.flink.test.plugin.jar.failure; + +public class TestFailureEnricherFactory implements FailureEnricherFactory { + + @Override + public FailureEnricher createFailureEnricher(Configuration conf) { +return new CustomEnricher(); + } +} +``` + +FailureEnricher example: + +``` java +package org.apache.flink.test.plugin.jar.failure; + +public class CustomEnricher implements FailureEnricher { +private final Set outputKeys; + +public CustomEnricher() { +this.outputKeys = Collections.singleton("labelKey"); +} + +@Override +public Set getOutputKeys() { +return outputKeys; +} + +@Override +public CompletableFuture> processFailure( +Throwable cause, Context context) { +return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue")); +} +} +``` + +### Configuration + +The JobManager loads FailureEnricher plugins at startup. To make sure your FailureEnrichers are loaded all class names should be defined as part of [jobmanager.failure-enrichers configuration]({{< ref "docs/deployment/config#jobmanager-failure-enrichers" >}}). +If this configuration is empty, NO enrichers will be started. Example: +``` +jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.CustomEnricher +``` + +### Validation + +To validate that your FailureEnricher is loaded, you can check the JobManager logs for the following line: +``` +Found failure enricher
[flink] branch release-1.18 updated: [FLINK-31889][docs] Add documentation for implementing/loading enrichers
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 7339a5359cb [FLINK-31889][docs] Add documentation for implementing/loading enrichers 7339a5359cb is described below commit 7339a5359cb1ea1e718c00037bc322b02cc523e2 Author: Panagiotis Garefalakis AuthorDate: Sun Apr 23 19:07:20 2023 -0700 [FLINK-31889][docs] Add documentation for implementing/loading enrichers --- .../docs/deployment/advanced/failure_enrichers.md | 110 + .../docs/deployment/advanced/failure_enrichers.md | 110 + 2 files changed, 220 insertions(+) diff --git a/docs/content.zh/docs/deployment/advanced/failure_enrichers.md b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md new file mode 100644 index 000..0bbf45efd68 --- /dev/null +++ b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md @@ -0,0 +1,110 @@ +--- +title: "Failure Enrichers" +nav-title: failure-enrichers +nav-parent_id: advanced +nav-pos: 3 +--- + + +## Custom failure enrichers +Flink provides a pluggable interface for users to register their custom logic and enrich failures with extra metadata labels (string key-value pairs). +This enables users to implement their own failure enrichment plugins to categorize job failures, expose custom metrics, or make calls to external notification systems. + +FailureEnrichers are triggered every time an exception is reported at runtime by the JobManager. +Every FailureEnricher may asynchronously return labels associated with the failure that are then exposed via the JobManager's REST API (e.g., a 'type:System' label implying the failure is categorized as a system error). + + +### Implement a plugin for your custom enricher + +To implement a custom FailureEnricher plugin, you need to: + +- Add your own FailureEnricher by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java" name="FailureEnricher" >}} interface. + +- Add your own FailureEnricherFactory by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java" name="FailureEnricherFactory" >}} interface. + +- Add a service entry. Create a file `META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which contains the class name of your failure enricher factory class (see [Java Service Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html) docs for more details). + + +Then, create a jar which includes your `FailureEnricher`, `FailureEnricherFactory`, `META-INF/services/` and all external dependencies. +Make a directory in `plugins/` of your Flink distribution with an arbitrary name, e.g. "failure-enrichment", and put the jar into this directory. +See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more details. + +{{< hint warning >}} +Note that every FailureEnricher should have defined a set of {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java" name="output keys" >}} that may be associated with values. This set of keys has to be unique otherwise all enrichers with overlapping keys will be ignored. +{{< /hint >}} + +FailureEnricherFactory example: + +``` java +package org.apache.flink.test.plugin.jar.failure; + +public class TestFailureEnricherFactory implements FailureEnricherFactory { + + @Override + public FailureEnricher createFailureEnricher(Configuration conf) { +return new CustomEnricher(); + } +} +``` + +FailureEnricher example: + +``` java +package org.apache.flink.test.plugin.jar.failure; + +public class CustomEnricher implements FailureEnricher { +private final Set outputKeys; + +public CustomEnricher() { +this.outputKeys = Collections.singleton("labelKey"); +} + +@Override +public Set getOutputKeys() { +return outputKeys; +} + +@Override +public CompletableFuture> processFailure( +Throwable cause, Context context) { +return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue")); +} +} +``` + +### Configuration + +The JobManager loads FailureEnricher plugins at startup. To make sure your FailureEnrichers are loaded all class names should be defined as part of [jobmanager.failure-enrichers configuration]({{< ref "docs/deployment/config#jobmanager-failure-enrichers" >}}). +If this configuration is empty, NO enrichers will be started. Example: +``` +jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.CustomEnricher +``` + +### Validation + +To validate that your FailureEnricher is loaded, you can check the JobManager logs for the following line: +``` +Found failure enricher