[flink] branch master updated (ade583cf804 -> e9353319ad6)

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from ade583cf804 [FLINK-32870][network] Tiered storage supports reading 
multiple small buffers by reading and slicing one large buffer
 new 798a20a2c94 [FLINK-32978][flink-core] Deprecate 
RichFunction#open(Configuration parameters)
 new e9353319ad6 [FLINK-32978][flink-core] Migrate the usage of 
RichFunction#open(Configuration parameters) to RichFunction#open(OpenContext 
openContext)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/dev/datastream/operators/asyncio.md   |  2 +-
 .../dev/datastream/operators/process_function.md   |  2 +-
 docs/content.zh/docs/dev/table/data_stream_api.md  |  2 +-
 docs/content.zh/docs/dev/table/sourcesSinks.md |  2 +-
 docs/content.zh/docs/libs/state_processor_api.md   |  6 +--
 docs/content.zh/docs/try-flink/datastream.md   |  6 +--
 .../docs/dev/datastream/operators/asyncio.md   |  2 +-
 .../dev/datastream/operators/process_function.md   |  2 +-
 docs/content/docs/dev/table/data_stream_api.md |  2 +-
 docs/content/docs/dev/table/sourcesSinks.md|  2 +-
 docs/content/docs/libs/state_processor_api.md  |  6 +--
 docs/content/docs/try-flink/datastream.md  |  6 +--
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e   |  8 +--
 .../source/reader/CoordinatedSourceITCase.java |  4 +-
 .../batch/compact/BatchPartitionCommitterSink.java |  4 +-
 .../compact/BatchPartitionCommitterSinkTest.java   |  4 +-
 .../hive/factories/TestLockTableSinkFactory.java   |  4 +-
 .../mapred/HadoopMapFunction.java  |  8 +--
 .../mapred/HadoopReduceCombineFunction.java|  8 +--
 .../mapred/HadoopReduceFunction.java   |  8 +--
 .../flink/api/common/aggregators/Aggregator.java   |  2 +-
 .../api/common/functions/AbstractRichFunction.java |  4 +-
 .../api/common/functions/DefaultOpenContext.java   | 10 ++--
 .../flink/api/common/functions/OpenContext.java| 12 ++---
 .../common/functions/RichAggregateFunction.java|  3 +-
 .../api/common/functions/RichCoGroupFunction.java  |  3 +-
 .../api/common/functions/RichCrossFunction.java|  3 +-
 .../api/common/functions/RichFilterFunction.java   |  3 +-
 .../api/common/functions/RichFlatJoinFunction.java |  3 +-
 .../api/common/functions/RichFlatMapFunction.java  |  3 +-
 .../flink/api/common/functions/RichFunction.java   | 55 +++
 .../common/functions/RichGroupCombineFunction.java |  3 +-
 .../common/functions/RichGroupReduceFunction.java  |  3 +-
 .../api/common/functions/RichJoinFunction.java |  3 +-
 .../api/common/functions/RichMapFunction.java  |  3 +-
 .../common/functions/RichMapPartitionFunction.java |  3 +-
 .../api/common/functions/RichReduceFunction.java   |  3 +-
 .../api/common/functions/util/FunctionUtils.java   |  6 +--
 .../common/operators/base/BulkIterationBase.java   |  4 +-
 .../common/operators/base/CoGroupOperatorBase.java |  3 +-
 .../operators/base/CoGroupRawOperatorBase.java |  3 +-
 .../common/operators/base/CrossOperatorBase.java   |  3 +-
 .../common/operators/base/FilterOperatorBase.java  |  3 +-
 .../common/operators/base/FlatMapOperatorBase.java |  3 +-
 .../operators/base/GroupCombineOperatorBase.java   |  3 +-
 .../operators/base/GroupReduceOperatorBase.java|  3 +-
 .../operators/base/InnerJoinOperatorBase.java  |  3 +-
 .../api/common/operators/base/MapOperatorBase.java |  3 +-
 .../operators/base/MapPartitionOperatorBase.java   |  3 +-
 .../operators/base/OuterJoinOperatorBase.java  |  3 +-
 .../common/operators/base/ReduceOperatorBase.java  |  3 +-
 .../base/FlatMapOperatorCollectionTest.java|  4 +-
 .../operators/base/InnerJoinOperatorBaseTest.java  |  4 +-
 .../api/common/operators/base/MapOperatorTest.java |  4 +-
 .../operators/base/OuterJoinOperatorBaseTest.java  |  4 +-
 .../operators/base/PartitionMapOperatorTest.java   |  4 +-
 .../api/java/typeutils/TypeExtractorTest.java  |  4 +-
 .../tests/BlockingIncrementingMapFunction.java |  4 +-
 .../streaming/tests/SemanticsCheckMapper.java  |  4 +-
 .../streaming/tests/SlidingWindowCheckMapper.java  |  4 +-
 .../NettyShuffleMemoryControlTestProgram.java  |  4 +-
 .../tests/queryablestate/QsStateProducer.java  |  8 +--
 .../RocksDBStateMemoryControlTestProgram.java  |  8 +--
 .../apache/flink/test/StatefulStreamingJob.java|  6 +--
 .../flink/examples/java/clustering/KMeans.java |  4 +-
 .../apache/flink/examples/java/distcp/DistCp.java  |  4 +-
 .../relational/EmptyFieldsCountAccumulator.java|  6 +--
 .../streaming/examples/async/AsyncIOExample.java   |  4 +-
 .../streaming/examples/gpu/MatrixVectorMul.java|  4 

[flink] 01/02: [FLINK-32978][flink-core] Deprecate RichFunction#open(Configuration parameters)

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 798a20a2c948db976ff7ef9e24f92191834bd921
Author: Wencong Liu 
AuthorDate: Fri Sep 8 11:04:27 2023 +0800

[FLINK-32978][flink-core] Deprecate RichFunction#open(Configuration 
parameters)
---
 .../api/common/functions/DefaultOpenContext.java   | 28 +++
 .../flink/api/common/functions/OpenContext.java| 29 
 .../flink/api/common/functions/RichFunction.java   | 55 ++
 3 files changed, 112 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java
new file mode 100644
index 000..21fca4e4c31
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/DefaultOpenContext.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** The default implementation of {@link OpenContext}. */
+@PublicEvolving
+public class DefaultOpenContext implements OpenContext {
+
+public static final OpenContext INSTANCE = new DefaultOpenContext();
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 000..4ff5484b3b0
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required 
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
+ * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index 240b228557f..ae83fb30f2b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -61,9 +62,63 @@ public interface RichFunction extends Function {
  * When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
  * decide whether to retry the task execution.
  * @see org.apache.flink.configuration.Configuration
+ * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
+ * implement {@code open(OpenContext openContext)} and implement 
{@code open(Configuration
+ * parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
+ * openContext)}, the {@code 

[flink] 02/02: [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d100ab65367fe0b3d74a9901bcaaa26049c996b0
Author: Yuxin Tan 
AuthorDate: Tue Aug 22 15:34:52 2023 +0800

[FLINK-32870][network] Tiered storage supports reading multiple small 
buffers by reading and slicing one large buffer

This closes #23255
---
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java  |   7 +-
 .../hybrid/index/FileDataIndexRegionHelper.java|   5 +-
 .../hybrid/index/FileRegionWriteReadUtils.java |   9 +-
 .../hybrid/tiered/file/PartitionFileReader.java|  79 -
 .../file/ProducerMergedPartitionFileIndex.java |  36 +-
 .../file/ProducerMergedPartitionFileReader.java| 383 ++---
 .../file/ProducerMergedPartitionFileWriter.java|   3 +-
 .../tiered/file/SegmentPartitionFileReader.java|  29 +-
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   |  98 --
 .../tier/remote/RemoteTierConsumerAgent.java   |  17 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   9 +-
 .../hybrid/index/FileDataIndexCacheTest.java   |   2 +-
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |   9 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   |  22 +-
 .../hybrid/tiered/file/DiskIOSchedulerTest.java|   8 +-
 .../file/ProducerMergedPartitionFileIndexTest.java |   2 +-
 .../ProducerMergedPartitionFileReaderTest.java | 132 ---
 .../file/SegmentPartitionFileReaderTest.java   |  25 +-
 .../tiered/file/TestingPartitionFileReader.java|  21 +-
 20 files changed, 642 insertions(+), 256 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
index 863be5b5b44..f89994c2d44 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil {
 }
 }
 
-static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws 
IOException {
+public static void readByteBufferFully(FileChannel channel, ByteBuffer b) 
throws IOException {
 // the post-checked loop here gets away with one less check in the 
normal case
 do {
 if (channel.read(b) == -1) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 4b4221225cf..25012ffa8f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
 }
 
 @Override
-public long getRegionFileOffset() {
+public long getRegionStartOffset() {
 return regionFileOffset;
 }
 
+@Override
+public long getRegionEndOffset() {
+throw new UnsupportedOperationException("This method is not 
supported.");
+}
+
 @Override
 public int getNumBuffers() {
 return numBuffers;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
index c0d4270e109..77638bb3156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
@@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelperNote that the implementation of the interface should strongly bind 
with the implementation
+ * of {@link PartitionFileReader}.
+ */
+interface ReadProgress {}
+
+/**
+ * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+ * continue reading, and the read progress, etc.
+ */
+class ReadBufferResult {
+
+/** The read buffers. */
+private final List readBuffers;
+
+/**
+ * A hint to determine whether the caller may continue reading the 
following buffers. Note
+ * that this hint is merely a recommendation and not obligatory. 
Following the hint while
+ * reading buffers may improve performance.
+ */
+

[flink] 01/02: [hotfix][network] Fix the close method for the tiered producer client

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae647e2b489f22a32b25d3625e5a7d2090e85f06
Author: Yuxin Tan 
AuthorDate: Mon Aug 14 19:45:34 2023 +0800

[hotfix][network] Fix the close method for the tiered producer client
---
 .../network/partition/hybrid/tiered/shuffle/TieredResultPartition.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
index d3aae631620..f7070165fa1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
@@ -180,13 +180,13 @@ public class TieredResultPartition extends 
ResultPartition {
 @Override
 public void finish() throws IOException {
 broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+tieredStorageProducerClient.close();
 checkState(!isReleased(), "Result partition is already released.");
 super.finish();
 }
 
 @Override
 public void close() {
-tieredStorageProducerClient.close();
 super.close();
 }
 



[flink] branch release-1.18 updated (f2584a1df36 -> d100ab65367)

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


from f2584a1df36 [FLINK-33063][table-runtime] Fix udaf with complex user 
defined pojo object throw error while generate record equaliser (#23388)
 new ae647e2b489 [hotfix][network] Fix the close method for the tiered 
producer client
 new d100ab65367 [FLINK-32870][network] Tiered storage supports reading 
multiple small buffers by reading and slicing one large buffer

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java  |   7 +-
 .../hybrid/index/FileDataIndexRegionHelper.java|   5 +-
 .../hybrid/index/FileRegionWriteReadUtils.java |   9 +-
 .../hybrid/tiered/file/PartitionFileReader.java|  79 -
 .../file/ProducerMergedPartitionFileIndex.java |  36 +-
 .../file/ProducerMergedPartitionFileReader.java| 383 ++---
 .../file/ProducerMergedPartitionFileWriter.java|   3 +-
 .../tiered/file/SegmentPartitionFileReader.java|  29 +-
 .../tiered/shuffle/TieredResultPartition.java  |   2 +-
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   |  98 --
 .../tier/remote/RemoteTierConsumerAgent.java   |  17 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   9 +-
 .../hybrid/index/FileDataIndexCacheTest.java   |   2 +-
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |   9 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   |  22 +-
 .../hybrid/tiered/file/DiskIOSchedulerTest.java|   8 +-
 .../file/ProducerMergedPartitionFileIndexTest.java |   2 +-
 .../ProducerMergedPartitionFileReaderTest.java | 132 ---
 .../file/SegmentPartitionFileReaderTest.java   |  25 +-
 .../tiered/file/TestingPartitionFileReader.java|  21 +-
 21 files changed, 643 insertions(+), 257 deletions(-)



[flink] 01/02: [hotfix][network] Fix the close method for the tiered producer client

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e41b0b2676166de582c4dd9f0289c585c31aca3b
Author: Yuxin Tan 
AuthorDate: Mon Aug 14 19:45:34 2023 +0800

[hotfix][network] Fix the close method for the tiered producer client
---
 .../network/partition/hybrid/tiered/shuffle/TieredResultPartition.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
index d3aae631620..f7070165fa1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
@@ -180,13 +180,13 @@ public class TieredResultPartition extends 
ResultPartition {
 @Override
 public void finish() throws IOException {
 broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+tieredStorageProducerClient.close();
 checkState(!isReleased(), "Result partition is already released.");
 super.finish();
 }
 
 @Override
 public void close() {
-tieredStorageProducerClient.close();
 super.close();
 }
 



[flink] branch master updated (649b7fe197c -> ade583cf804)

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 649b7fe197c [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3
 new e41b0b26761 [hotfix][network] Fix the close method for the tiered 
producer client
 new ade583cf804 [FLINK-32870][network] Tiered storage supports reading 
multiple small buffers by reading and slicing one large buffer

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java  |   7 +-
 .../hybrid/index/FileDataIndexRegionHelper.java|   5 +-
 .../hybrid/index/FileRegionWriteReadUtils.java |   9 +-
 .../hybrid/tiered/file/PartitionFileReader.java|  79 -
 .../file/ProducerMergedPartitionFileIndex.java |  36 +-
 .../file/ProducerMergedPartitionFileReader.java| 383 ++---
 .../file/ProducerMergedPartitionFileWriter.java|   3 +-
 .../tiered/file/SegmentPartitionFileReader.java|  29 +-
 .../tiered/shuffle/TieredResultPartition.java  |   2 +-
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   |  98 --
 .../tier/remote/RemoteTierConsumerAgent.java   |  17 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   9 +-
 .../hybrid/index/FileDataIndexCacheTest.java   |   2 +-
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |   9 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   |  22 +-
 .../hybrid/tiered/file/DiskIOSchedulerTest.java|   8 +-
 .../file/ProducerMergedPartitionFileIndexTest.java |   2 +-
 .../ProducerMergedPartitionFileReaderTest.java | 132 ---
 .../file/SegmentPartitionFileReaderTest.java   |  25 +-
 .../tiered/file/TestingPartitionFileReader.java|  21 +-
 21 files changed, 643 insertions(+), 257 deletions(-)



[flink] 02/02: [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ade583cf80478ac60e9b83fc0a97f36bc2b26f1c
Author: Yuxin Tan 
AuthorDate: Tue Aug 22 15:34:52 2023 +0800

[FLINK-32870][network] Tiered storage supports reading multiple small 
buffers by reading and slicing one large buffer

This closes #23255
---
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java  |   7 +-
 .../hybrid/index/FileDataIndexRegionHelper.java|   5 +-
 .../hybrid/index/FileRegionWriteReadUtils.java |   9 +-
 .../hybrid/tiered/file/PartitionFileReader.java|  79 -
 .../file/ProducerMergedPartitionFileIndex.java |  36 +-
 .../file/ProducerMergedPartitionFileReader.java| 383 ++---
 .../file/ProducerMergedPartitionFileWriter.java|   3 +-
 .../tiered/file/SegmentPartitionFileReader.java|  29 +-
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   |  98 --
 .../tier/remote/RemoteTierConsumerAgent.java   |  17 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   9 +-
 .../hybrid/index/FileDataIndexCacheTest.java   |   2 +-
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |   9 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   |  22 +-
 .../hybrid/tiered/file/DiskIOSchedulerTest.java|   8 +-
 .../file/ProducerMergedPartitionFileIndexTest.java |   2 +-
 .../ProducerMergedPartitionFileReaderTest.java | 132 ---
 .../file/SegmentPartitionFileReaderTest.java   |  25 +-
 .../tiered/file/TestingPartitionFileReader.java|  21 +-
 20 files changed, 642 insertions(+), 256 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
index 863be5b5b44..f89994c2d44 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil {
 }
 }
 
-static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws 
IOException {
+public static void readByteBufferFully(FileChannel channel, ByteBuffer b) 
throws IOException {
 // the post-checked loop here gets away with one less check in the 
normal case
 do {
 if (channel.read(b) == -1) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 4b4221225cf..25012ffa8f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
 }
 
 @Override
-public long getRegionFileOffset() {
+public long getRegionStartOffset() {
 return regionFileOffset;
 }
 
+@Override
+public long getRegionEndOffset() {
+throw new UnsupportedOperationException("This method is not 
supported.");
+}
+
 @Override
 public int getNumBuffers() {
 return numBuffers;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
index c0d4270e109..77638bb3156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
@@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelperNote that the implementation of the interface should strongly bind 
with the implementation
+ * of {@link PartitionFileReader}.
+ */
+interface ReadProgress {}
+
+/**
+ * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+ * continue reading, and the read progress, etc.
+ */
+class ReadBufferResult {
+
+/** The read buffers. */
+private final List readBuffers;
+
+/**
+ * A hint to determine whether the caller may continue reading the 
following buffers. Note
+ * that this hint is merely a recommendation and not obligatory. 
Following the hint while
+ * reading buffers may improve performance.
+ */
+

[flink] branch release-1.18 updated: [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser (#23388)

2023-09-11 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new f2584a1df36 [FLINK-33063][table-runtime] Fix udaf with complex user 
defined pojo object throw error while generate record equaliser (#23388)
f2584a1df36 is described below

commit f2584a1df364a14ff50b5a52fe7cf5e38d4cdc9a
Author: yunhong <337361...@qq.com>
AuthorDate: Tue Sep 12 11:08:48 2023 +0800

[FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object 
throw error while generate record equaliser (#23388)

Co-authored-by: zhengyunhong.zyh 
---
 .../plan/utils/JavaUserDefinedAggFunctions.java| 80 ++
 .../runtime/stream/sql/AggregateITCase.scala   | 30 +++-
 .../table/runtime/typeutils/TypeCheckUtils.java| 12 +++-
 3 files changed, 120 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java
index 64cb9bbd2bd..4e722be8ee3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java
@@ -25,7 +25,11 @@ import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.functions.AggregateFunction;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /** Test aggregator functions. */
 public class JavaUserDefinedAggFunctions {
@@ -421,4 +425,80 @@ public class JavaUserDefinedAggFunctions {
 return Tuple1.of(0L);
 }
 }
+
+/** User defined pojo object. */
+public static class TestObject {
+private final String a;
+
+public TestObject(String a) {
+this.a = a;
+}
+
+public String getA() {
+return a;
+}
+}
+
+/** User defined object. */
+public static class UserDefinedObject {
+// List with user defined pojo object.
+public List testObjectList = new ArrayList<>();
+// Map with user defined pojo object.
+public Map testObjectMap = new HashMap<>();
+}
+
+/** User defined UDAF whose value and acc is user defined complex pojo 
object. */
+public static class UserDefinedObjectUDAF
+extends AggregateFunction {
+private static final String KEY = "key";
+
+@Override
+public UserDefinedObject getValue(UserDefinedObject accumulator) {
+return accumulator;
+}
+
+@Override
+public UserDefinedObject createAccumulator() {
+return new UserDefinedObject();
+}
+
+public void accumulate(UserDefinedObject acc, String a) {
+if (a != null) {
+acc.testObjectList.add(new TestObject(a));
+acc.testObjectMap.put(KEY, new TestObject(a));
+}
+}
+
+public void retract(UserDefinedObject acc, UserDefinedObject a) {
+// do nothing.
+}
+}
+
+/** User defined UDAF whose value and acc is user defined complex pojo 
object. */
+public static class UserDefinedObjectUDAF2
+extends AggregateFunction {
+private static final String KEY = "key";
+
+@Override
+public String getValue(UserDefinedObject accumulator) {
+if (accumulator.testObjectMap.containsKey(KEY)) {
+return accumulator.testObjectMap.get(KEY).getA();
+}
+return null;
+}
+
+@Override
+public UserDefinedObject createAccumulator() {
+return new UserDefinedObject();
+}
+
+public void accumulate(UserDefinedObject acc, UserDefinedObject a) {
+acc.testObjectList = a.testObjectList;
+acc.testObjectMap = a.testObjectMap;
+}
+
+public void retract(UserDefinedObject acc, UserDefinedObject a) {
+// do nothing
+}
+}
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 152c8d02be4..4df20e1615d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 

[flink] branch release-1.18 updated: [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338)

2023-09-11 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new f66679bfe5f [FLINK-32952][table-planner] Fix scan reuse with readable 
metadata and watermark push down get wrong watermark error (#23338)
f66679bfe5f is described below

commit f66679bfe5f5b344eec71a7579504762cc3c04ae
Author: yunhong <337361...@qq.com>
AuthorDate: Tue Sep 12 11:07:23 2023 +0800

[FLINK-32952][table-planner] Fix scan reuse with readable metadata and 
watermark push down get wrong watermark error (#23338)

Co-authored-by: zhengyunhong.zyh 
---
 .../flink/table/planner/plan/reuse/ScanReuser.java |  16 +-
 .../table/planner/plan/reuse/ScanReuserUtils.java  |  12 +-
 .../table/planner/plan/optimize/ScanReuseTest.java |  76 
 .../table/planner/plan/optimize/ScanReuseTest.xml  | 214 +++--
 4 files changed, 243 insertions(+), 75 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
index 1ef3bf2f2b1..9128e110346 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
@@ -170,12 +170,18 @@ public class ScanReuser {
 // 2. Create new source.
 List specs = 
abilitySpecsWithoutEscaped(pickTable);
 
-// 2.1 Apply projections
-List newSpecs = new ArrayList<>();
+// 2.1 Create produced type.
+// The source produced type is the input type into the runtime. 
The format looks as:
+// PHYSICAL COLUMNS + METADATA COLUMNS. While re-compute the 
source ability specs with
+// source metadata, we need to distinguish between schema type and 
produced type, which
+// source ability specs use produced type instead of schema type.
 RowType originType =
 DynamicSourceUtils.createProducedType(
 
pickTable.contextResolvedTable().getResolvedSchema(),
 pickTable.tableSource());
+
+// 2.2 Apply projections
+List newSpecs = new ArrayList<>();
 RowType newSourceType =
 applyPhysicalAndMetadataPushDown(
 pickTable.tableSource(),
@@ -190,15 +196,15 @@ public class ScanReuser {
 allMetaKeys);
 specs.addAll(newSpecs);
 
-// 2.2 Watermark spec
+// 2.3 Watermark spec
 Optional watermarkSpec =
-getAdjustedWatermarkSpec(pickTable, newSourceType);
+getAdjustedWatermarkSpec(pickTable, originType, 
newSourceType);
 if (watermarkSpec.isPresent()) {
 specs.add(watermarkSpec.get());
 newSourceType = watermarkSpec.get().getProducedType().get();
 }
 
-// 2.3 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
+// 2.4 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
 DynamicTableSourceSpec tableSourceSpec =
 new 
DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs);
 ScanTableSource newTableSource =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
index 5c79cd4cd25..43a00d720a2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
@@ -170,24 +170,18 @@ public class ScanReuserUtils {
 
 /** Watermark push down must be after projection push down, so we need to 
adjust its index. */
 public static Optional getAdjustedWatermarkSpec(
-TableSourceTable table, RowType newSourceType) {
-RowType producedType =
-(RowType)
-table.contextResolvedTable()
-.getResolvedSchema()
-.toSourceRowDataType()
-.getLogicalType();
+TableSourceTable table, RowType oldSourceType, RowType 
newSourceType) {
 for (SourceAbilitySpec spec : table.abilitySpecs()) {
 if (spec instanceof WatermarkPushDownSpec) {
 return Optional.of(
 adjustWatermarkIndex(

[flink] branch master updated: [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3

2023-09-11 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 649b7fe197c [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3
649b7fe197c is described below

commit 649b7fe197c8b03cce9595adcfea33c8d708a8b4
Author: Shengkai <1059623...@qq.com>
AuthorDate: Thu Sep 7 17:55:31 2023 +0800

[FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3

This closes #23370
---
 .../org/apache/flink/tests/hive/HiveITCase.java|   6 +-
 .../flink/tests/hive/containers/HiveContainer.java | 188 
 .../tests/hive/containers/HiveContainers.java  | 246 +
 3 files changed, 249 insertions(+), 191 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java
index 5b310688f89..24a759887c5 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.test.util.SQLJobSubmission;
-import org.apache.flink.tests.hive.containers.HiveContainer;
+import org.apache.flink.tests.hive.containers.HiveContainers;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkResource;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
@@ -69,8 +69,8 @@ public class HiveITCase extends TestLogger {
 @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
 
 @ClassRule
-public static final HiveContainer HIVE_CONTAINER =
-new HiveContainer(
+public static final HiveContainers.HiveContainer HIVE_CONTAINER =
+HiveContainers.createHiveContainer(
 Arrays.asList("hive_sink1", "hive_sink2", "h_table_sink1", 
"h_table_sink2"));
 
 private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc";
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java
deleted file mode 100644
index d4ca4080023..000
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.hive.containers;
-
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.test.parameters.ParameterProperty;
-import org.apache.flink.util.DockerImageVersions;
-
-import com.github.dockerjava.api.command.InspectContainerResponse;
-import okhttp3.FormBody;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import org.junit.runner.Description;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.UUID;
-
-/** Test container for Hive. */
-public class HiveContainer extends GenericContainer {
-
-private static final Logger LOG = 
LoggerFactory.getLogger(HiveContainer.class);
-public static final String HOST_NAME = "hadoop-master";
-public static final int HIVE_METASTORE_PORT = 9083;
-
-private static final boolean HIVE_310_OR_LATER =
-

[flink-connector-elasticsearch] branch main updated: [FLINK-29042][Connectors/ElasticSearch] Support lookup join for es connector (#39)

2023-09-11 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
 new fef5c96  [FLINK-29042][Connectors/ElasticSearch] Support lookup join 
for es connector (#39)
fef5c96 is described below

commit fef5c964f4a2fc8b61c482b0bc8504757581adfb
Author: Yubin Li 
AuthorDate: Tue Sep 12 04:31:20 2023 +0800

[FLINK-29042][Connectors/ElasticSearch] Support lookup join for es 
connector (#39)
---
 flink-connector-elasticsearch-base/pom.xml |   8 +
 .../elasticsearch/ElasticsearchApiCallBridge.java  |  17 ++
 .../table/ElasticsearchRowDataLookupFunction.java  | 183 +
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |  12 ++
 .../pom.xml|   5 +
 .../tests/ElasticsearchLookupE2ECase.java  | 126 ++
 .../pom.xml|  18 ++
 .../tests/Elasticsearch6LookupE2ECase.java |  44 +
 .../pom.xml|  18 ++
 .../tests/Elasticsearch7LookupE2ECase.java |  43 +
 .../table/Elasticsearch6DynamicSource.java | 135 +++
 ...java => Elasticsearch6DynamicTableFactory.java} |  74 -
 .../Elasticsearch6ApiCallBridge.java   |  23 ++-
 .../org.apache.flink.table.factories.Factory   |   2 +-
 ... => Elasticsearch6DynamicTableFactoryTest.java} |   2 +-
 .../table/Elasticsearch6DynamicSinkITCase.java |   5 +-
 ... => Elasticsearch6DynamicTableFactoryTest.java} |  40 ++---
 .../table/Elasticsearch7DynamicSource.java | 135 +++
 ...java => Elasticsearch7DynamicTableFactory.java} |  74 -
 .../Elasticsearch7ApiCallBridge.java   |  23 ++-
 .../org.apache.flink.table.factories.Factory   |   2 +-
 ... => Elasticsearch7DynamicTableFactoryTest.java} |   2 +-
 .../table/Elasticsearch7DynamicSinkITCase.java |   5 +-
 ... => Elasticsearch7DynamicTableFactoryTest.java} |  40 ++---
 24 files changed, 976 insertions(+), 60 deletions(-)

diff --git a/flink-connector-elasticsearch-base/pom.xml 
b/flink-connector-elasticsearch-base/pom.xml
index 9c65924..f859b1b 100644
--- a/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connector-elasticsearch-base/pom.xml
@@ -164,6 +164,14 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-table-runtime
+   ${flink.version}
+   provided
+   true
+   
+

 

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 8061d04..aab5cf5 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -19,9 +19,11 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.search.SearchRequest;
 
 import javax.annotation.Nullable;
 
@@ -61,6 +63,21 @@ public interface ElasticsearchApiCallBridge extends Ser
  */
 BulkProcessor.Builder createBulkProcessorBuilder(C client, 
BulkProcessor.Listener listener);
 
+/**
+ * Executes a search using the Search API.
+ *
+ * @param client the Elasticsearch client.
+ * @param searchRequest A request to execute search against one or more 
indices (or all).
+ */
+Tuple2 search(C client, SearchRequest searchRequest) 
throws IOException;
+
+/**
+ * Closes this client and releases any system resources associated with it.
+ *
+ * @param client the Elasticsearch client.
+ */
+void close(C client) throws IOException;
+
 /**
  * Extracts the cause of failure of a bulk item action.
  *
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java
new file mode 100644
index 000..1cde271
--- /dev/null
+++ 

[flink] branch master updated: [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832)

2023-09-11 Thread zjureel
This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c33a6527d83 [FLINK-32396][jdbc-driver] Support timestamp and 
timestamp_ltz for jdbc driver and sql gateway (#22832)
c33a6527d83 is described below

commit c33a6527d8383dc571e0b648b8a29322416ab9d6
Author: Shammon FY 
AuthorDate: Mon Sep 11 20:34:34 2023 +0800

[FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc 
driver and sql gateway (#22832)

* [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc 
driver and sql gateway
---
 .../handler/statement/FetchResultsHandler.java |  19 ++-
 .../flink/table/gateway/rest/serde/ResultInfo.java |  14 +-
 .../rest/util/RowDataLocalTimeZoneConverter.java   | 187 +
 .../utils/RowDataLocalTimeZoneConverterTest.java   | 172 +++
 .../apache/flink/table/jdbc/FlinkResultSet.java|  43 +++--
 .../flink/table/jdbc/utils/ArrayFieldGetter.java   | 121 -
 .../flink/table/jdbc/FlinkStatementTest.java   |  52 --
 7 files changed, 464 insertions(+), 144 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
index 8cb4ff80802..4303158ea4e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.rest.handler.statement;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
@@ -39,13 +40,18 @@ import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowForm
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /** Handler to fetch results. */
 public class FetchResultsHandler
@@ -100,13 +106,24 @@ public class FetchResultsHandler
 return CompletableFuture.completedFuture(
 new NotReadyFetchResultResponse(nextResultUri));
 } else {
+RowDataLocalTimeZoneConverter timeZoneConverter = null;
+if (rowFormat == RowFormat.JSON) {
+List logicalTypeList =
+
resultSet.getResultSchema().getColumnDataTypes().stream()
+.map(DataType::getLogicalType)
+.collect(Collectors.toList());
+timeZoneConverter =
+new RowDataLocalTimeZoneConverter(
+logicalTypeList,
+
Configuration.fromMap(service.getSessionConfig(sessionHandle)));
+}
 return CompletableFuture.completedFuture(
 new FetchResultResponseBodyImpl(
 resultType,
 resultSet.isQueryResult(),
 resultSet.getJobID(),
 resultSet.getResultKind(),
-ResultInfo.createResultInfo(resultSet, rowFormat),
+ResultInfo.createResultInfo(resultSet, rowFormat, 
timeZoneConverter),
 nextResultUri));
 }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
index c34fb84dced..31957caca43 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
@@ -26,6 +26,7 @@ import 

[flink] 02/02: [FLINK-32979] Migrate the usage of getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger()

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 831cb8eff022db5543052c96716518c862c2
Author: Wencong Liu 
AuthorDate: Tue Aug 29 12:37:07 2023 +0800

[FLINK-32979] Migrate the usage of 
getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger()

This closes #23073
---
 .../org/apache/flink/state/api/WindowedOperatorTransformation.java | 2 +-
 .../org/apache/flink/state/api/WindowedStateTransformation.java| 2 +-
 .../apache/flink/streaming/api/datastream/AllWindowedStream.java   | 2 +-
 .../org/apache/flink/streaming/api/datastream/WindowedStream.java  | 2 +-
 .../api/windowing/assigners/DynamicEventTimeSessionWindows.java| 7 +++
 .../windowing/assigners/DynamicProcessingTimeSessionWindows.java   | 7 +++
 .../streaming/api/windowing/assigners/EventTimeSessionWindows.java | 6 ++
 .../flink/streaming/api/windowing/assigners/GlobalWindows.java | 6 ++
 .../api/windowing/assigners/ProcessingTimeSessionWindows.java  | 6 ++
 .../streaming/api/windowing/assigners/SlidingEventTimeWindows.java | 6 ++
 .../api/windowing/assigners/SlidingProcessingTimeWindows.java  | 6 ++
 .../api/windowing/assigners/TumblingEventTimeWindows.java  | 6 ++
 .../api/windowing/assigners/TumblingProcessingTimeWindows.java | 6 ++
 .../operators/windowing/DynamicEventTimeSessionWindowsTest.java| 5 +
 .../windowing/DynamicProcessingTimeSessionWindowsTest.java | 5 +
 .../runtime/operators/windowing/EventTimeSessionWindowsTest.java   | 5 +
 .../streaming/runtime/operators/windowing/GlobalWindowsTest.java   | 5 +
 .../runtime/operators/windowing/MergingWindowSetTest.java  | 6 ++
 .../operators/windowing/ProcessingTimeSessionWindowsTest.java  | 5 +
 .../runtime/operators/windowing/SlidingEventTimeWindowsTest.java   | 5 +
 .../operators/windowing/SlidingProcessingTimeWindowsTest.java  | 5 +
 .../runtime/operators/windowing/TumblingEventTimeWindowsTest.java  | 5 +
 .../operators/windowing/TumblingProcessingTimeWindowsTest.java | 5 +
 23 files changed, 75 insertions(+), 40 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java
index f0fdc8cabd5..ed72d7eaf19 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java
@@ -84,7 +84,7 @@ public class WindowedOperatorTransformation {
 this.builder =
 new WindowOperatorBuilder<>(
 windowAssigner,
-windowAssigner.getDefaultTrigger(null),
+windowAssigner.getDefaultTrigger(),
 input.getExecutionEnvironment().getConfig(),
 input.getType(),
 keySelector,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
index fc0f85812e8..5d42ba7811c 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
@@ -77,7 +77,7 @@ public class WindowedStateTransformation {
 this.builder =
 new WindowOperatorBuilder<>(
 windowAssigner,
-windowAssigner.getDefaultTrigger(null),
+windowAssigner.getDefaultTrigger(),
 input.getExecutionEnvironment().getConfig(),
 input.getType(),
 keySelector,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 33df648bab0..639d2af5f64 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -111,7 +111,7 @@ public class AllWindowedStream {
 public AllWindowedStream(DataStream input, WindowAssigner 
windowAssigner) {
 this.input = input.keyBy(new NullByteKeySelector());
 this.windowAssigner = windowAssigner;
-this.trigger 

[flink] 01/02: [FLINK-32979] Add getDefaultTrigger() to WindowAssigner

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29a6e06901d4880a2d115f74f3955762378b874d
Author: Wencong Liu 
AuthorDate: Tue Aug 29 12:36:37 2023 +0800

[FLINK-32979] Add getDefaultTrigger() to WindowAssigner
---
 .../api/windowing/assigners/WindowAssigner.java| 29 +-
 1 file changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 43533357eab..0bc1b0a7d36 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -54,7 +54,34 @@ public abstract class WindowAssigner 
implements Serializabl
 public abstract Collection assignWindows(
 T element, long timestamp, WindowAssignerContext context);
 
-/** Returns the default trigger associated with this {@code 
WindowAssigner}. */
+/**
+ * Returns the default trigger associated with this {@code WindowAssigner}.
+ *
+ * 1. If you override {@code getDefaultTrigger()}, the {@code 
getDefaultTrigger()} will be
+ * invoked and the {@code getDefaultTrigger(StreamExecutionEnvironment 
env)} won't be invoked.
+ * 2. If you don't override {@code getDefaultTrigger()}, the {@code
+ * getDefaultTrigger(StreamExecutionEnvironment env)} will be invoked in 
the default
+ * implementation of the {@code getDefaultTrigger()}.
+ */
+public Trigger getDefaultTrigger() {
+return getDefaultTrigger(new StreamExecutionEnvironment());
+}
+
+/**
+ * Returns the default trigger associated with this {@code WindowAssigner}.
+ *
+ * @deprecated the method is deprecated since Flink 1.19 because {@code
+ * StreamExecutionEnvironment} is unused. Please use {@code 
getDefaultTrigger} and override
+ * this method with an empty body instead. 1. If you override {@code 
getDefaultTrigger()},
+ * the {@code getDefaultTrigger()} will be invoked and the {@code
+ * getDefaultTrigger(StreamExecutionEnvironment env)} won't be 
invoked. 2. If you don't
+ * override {@code getDefaultTrigger()}, the {@code
+ * getDefaultTrigger(StreamExecutionEnvironment env)} will be invoked 
in the default
+ * implementation of the {@code getDefaultTrigger()}.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425229;>
+ * FLIP-343: Remove parameter in WindowAssigner#getDefaultTrigger() 

+ */
+@Deprecated
 public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment 
env);
 
 /**



[flink] branch master updated (484da993e22 -> 831cb8eff02)

2023-09-11 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 484da993e22 [FLINK-31889][docs] Add documentation for 
implementing/loading enrichers
 new 29a6e06901d [FLINK-32979] Add getDefaultTrigger() to WindowAssigner
 new 831cb8eff02 [FLINK-32979] Migrate the usage of 
getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../state/api/WindowedOperatorTransformation.java  |  2 +-
 .../state/api/WindowedStateTransformation.java |  2 +-
 .../api/datastream/AllWindowedStream.java  |  2 +-
 .../streaming/api/datastream/WindowedStream.java   |  2 +-
 .../assigners/DynamicEventTimeSessionWindows.java  |  7 ++
 .../DynamicProcessingTimeSessionWindows.java   |  7 ++
 .../assigners/EventTimeSessionWindows.java |  6 +
 .../api/windowing/assigners/GlobalWindows.java |  6 +
 .../assigners/ProcessingTimeSessionWindows.java|  6 +
 .../assigners/SlidingEventTimeWindows.java |  6 +
 .../assigners/SlidingProcessingTimeWindows.java|  6 +
 .../assigners/TumblingEventTimeWindows.java|  6 +
 .../assigners/TumblingProcessingTimeWindows.java   |  6 +
 .../api/windowing/assigners/WindowAssigner.java| 29 +-
 .../DynamicEventTimeSessionWindowsTest.java|  5 +---
 .../DynamicProcessingTimeSessionWindowsTest.java   |  5 +---
 .../windowing/EventTimeSessionWindowsTest.java |  5 +---
 .../operators/windowing/GlobalWindowsTest.java |  5 +---
 .../operators/windowing/MergingWindowSetTest.java  |  6 +
 .../ProcessingTimeSessionWindowsTest.java  |  5 +---
 .../windowing/SlidingEventTimeWindowsTest.java |  5 +---
 .../SlidingProcessingTimeWindowsTest.java  |  5 +---
 .../windowing/TumblingEventTimeWindowsTest.java|  5 +---
 .../TumblingProcessingTimeWindowsTest.java |  5 +---
 24 files changed, 103 insertions(+), 41 deletions(-)



[flink-connector-aws] branch main updated: [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer

2023-09-11 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
 new b996d1d  [FLINK-24943][Connectors / Kinesis] Explicitly create 
KryoSerializer for SequenceNumber class in Kinesis Consumer
b996d1d is described below

commit b996d1d854a221787cfbb46802b9f3c5bb0e6651
Author: Alexander Egorov 
AuthorDate: Mon Sep 11 06:10:27 2023 -0500

[FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for 
SequenceNumber class in Kinesis Consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   | 10 ++--
 .../connectors/kinesis/util/KinesisStateUtil.java  | 56 
 .../kinesis/FlinkKinesisConsumerTest.java  | 61 --
 .../testutils/TestableFlinkKinesisConsumer.java| 29 ++
 4 files changed, 120 insertions(+), 36 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3647176..c229a1c 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -47,6 +46,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import 
org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil;
 import 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
@@ -441,16 +441,14 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws 
Exception {
-TypeInformation> 
shardsStateTypeInfo =
-new TupleTypeInfo<>(
-TypeInformation.of(StreamShardMetadata.class),
-TypeInformation.of(SequenceNumber.class));
 
 sequenceNumsStateForCheckpoint =
 context.getOperatorStateStore()
 .getUnionListState(
 new ListStateDescriptor<>(
-sequenceNumsStateStoreName, 
shardsStateTypeInfo));
+sequenceNumsStateStoreName,
+
KinesisStateUtil.createShardsStateSerializer(
+
getRuntimeContext().getExecutionConfig(;
 
 if (context.isRestored()) {
 if (sequenceNumsToRestore == null) {
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
new file mode 100644
index 000..eba4440
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 

[flink] branch release-1.17 updated: [FLINK-32962][python] Remove pip version check on installing dependencies.

2023-09-11 Thread hong
This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new f786dbea0be [FLINK-32962][python] Remove pip version check on 
installing dependencies.
f786dbea0be is described below

commit f786dbea0be25cc62dc10311a8d64d9de2ee20dd
Author: Aleksandr Pilipenko 
AuthorDate: Fri Aug 25 21:37:00 2023 +0100

[FLINK-32962][python] Remove pip version check on installing dependencies.
---
 .../python/util/PythonEnvironmentManagerUtils.java | 25 +++---
 1 file changed, 3 insertions(+), 22 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
index 2aecd009e6d..b271c41a63f 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
@@ -58,12 +58,6 @@ public class PythonEnvironmentManagerUtils {
 + "import sysconfig;"
 + "print(sysconfig.get_path('platlib', vars={'base': 
sys.argv[1], 'platbase': sys.argv[1]}))";
 
-private static final String CHECK_PIP_VERSION_SCRIPT =
-"import sys;"
-+ "from pkg_resources import get_distribution, 
parse_version;"
-+ "pip_version = get_distribution('pip').version;"
-+ "print(parse_version(pip_version) >= 
parse_version(sys.argv[1]))";
-
 private static final String GET_RUNNER_DIR_SCRIPT =
 "import pyflink;"
 + "import os;"
@@ -105,13 +99,9 @@ public class PythonEnvironmentManagerUtils {
 "install",
 "--ignore-installed",
 "-r",
-requirementsFilePath));
-if (isPipVersionGreaterEqual("8.0.0", pythonExecutable, 
environmentVariables)) {
-commands.addAll(Arrays.asList("--prefix", requirementsInstallDir));
-} else {
-commands.addAll(
-Arrays.asList("--install-option", "--prefix=" + 
requirementsInstallDir));
-}
+requirementsFilePath,
+"--prefix",
+requirementsInstallDir));
 if (requirementsCacheDir != null) {
 commands.addAll(Arrays.asList("--no-index", "--find-links", 
requirementsCacheDir));
 }
@@ -173,15 +163,6 @@ public class PythonEnvironmentManagerUtils {
 return String.join(File.pathSeparator, out.trim().split("\n"));
 }
 
-private static boolean isPipVersionGreaterEqual(
-String pipVersion, String pythonExecutable, Map 
environmentVariables)
-throws IOException {
-String[] commands =
-new String[] {pythonExecutable, "-c", 
CHECK_PIP_VERSION_SCRIPT, pipVersion};
-String out = execute(commands, environmentVariables, false);
-return Boolean.parseBoolean(out.trim());
-}
-
 private static String execute(
 String[] commands, Map environmentVariables, 
boolean logDetails)
 throws IOException {



[flink] branch master updated: [FLINK-31889][docs] Add documentation for implementing/loading enrichers

2023-09-11 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 484da993e22 [FLINK-31889][docs] Add documentation for 
implementing/loading enrichers
484da993e22 is described below

commit 484da993e22c30b0c9e2bcf747036a79443519fc
Author: Panagiotis Garefalakis 
AuthorDate: Sun Apr 23 19:07:20 2023 -0700

[FLINK-31889][docs] Add documentation for implementing/loading enrichers
---
 .../docs/deployment/advanced/failure_enrichers.md  | 110 +
 .../docs/deployment/advanced/failure_enrichers.md  | 110 +
 2 files changed, 220 insertions(+)

diff --git a/docs/content.zh/docs/deployment/advanced/failure_enrichers.md 
b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md
new file mode 100644
index 000..0bbf45efd68
--- /dev/null
+++ b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md
@@ -0,0 +1,110 @@
+---
+title: "Failure Enrichers"
+nav-title: failure-enrichers
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Custom failure enrichers
+Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string key-value pairs).
+This enables users to implement their own failure enrichment plugins to 
categorize job failures, expose custom metrics, or make calls to external 
notification systems.
+
+FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
+Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's REST API (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+
+
+### Implement a plugin for your custom enricher
+
+To implement a custom FailureEnricher plugin, you need to:
+
+- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+
+- Add your own FailureEnricherFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java"
 name="FailureEnricherFactory" >}} interface.
+
+- Add a service entry. Create a file 
`META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which 
contains the class name of your failure enricher factory class (see [Java 
Service 
Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html)
 docs for more details).
+
+
+Then, create a jar which includes your `FailureEnricher`, 
`FailureEnricherFactory`, `META-INF/services/` and all external dependencies.
+Make a directory in `plugins/` of your Flink distribution with an arbitrary 
name, e.g. "failure-enrichment", and put the jar into this directory.
+See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more 
details.
+
+{{< hint warning >}}
+Note that every FailureEnricher should have defined a set of {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="output keys" >}} that may be associated with values. This set of keys 
has to be unique otherwise all enrichers with overlapping keys will be ignored.
+{{< /hint >}}
+
+FailureEnricherFactory example:
+
+``` java
+package org.apache.flink.test.plugin.jar.failure;
+
+public class TestFailureEnricherFactory implements FailureEnricherFactory {
+
+   @Override
+   public FailureEnricher createFailureEnricher(Configuration conf) {
+return new CustomEnricher();
+   }
+}
+```
+
+FailureEnricher example:
+
+``` java
+package org.apache.flink.test.plugin.jar.failure;
+
+public class CustomEnricher implements FailureEnricher {
+private final Set outputKeys;
+
+public CustomEnricher() {
+this.outputKeys = Collections.singleton("labelKey");
+}
+
+@Override
+public Set getOutputKeys() {
+return outputKeys;
+}
+
+@Override
+public CompletableFuture> processFailure(
+Throwable cause, Context context) {
+return 
CompletableFuture.completedFuture(Collections.singletonMap("labelKey", 
"labelValue"));
+}
+}
+```
+
+### Configuration
+
+The JobManager loads FailureEnricher plugins at startup. To make sure your 
FailureEnrichers are loaded all class names should be defined as part of 
[jobmanager.failure-enrichers configuration]({{< ref 
"docs/deployment/config#jobmanager-failure-enrichers" >}}).
+If this configuration is empty, NO enrichers will be started. Example:
+```
+jobmanager.failure-enrichers = 
org.apache.flink.test.plugin.jar.failure.CustomEnricher
+```
+
+### Validation
+
+To validate that your FailureEnricher is loaded, you can check the JobManager 
logs for the following line:
+```
+Found failure enricher 

[flink] branch release-1.18 updated: [FLINK-31889][docs] Add documentation for implementing/loading enrichers

2023-09-11 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 7339a5359cb [FLINK-31889][docs] Add documentation for 
implementing/loading enrichers
7339a5359cb is described below

commit 7339a5359cb1ea1e718c00037bc322b02cc523e2
Author: Panagiotis Garefalakis 
AuthorDate: Sun Apr 23 19:07:20 2023 -0700

[FLINK-31889][docs] Add documentation for implementing/loading enrichers
---
 .../docs/deployment/advanced/failure_enrichers.md  | 110 +
 .../docs/deployment/advanced/failure_enrichers.md  | 110 +
 2 files changed, 220 insertions(+)

diff --git a/docs/content.zh/docs/deployment/advanced/failure_enrichers.md 
b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md
new file mode 100644
index 000..0bbf45efd68
--- /dev/null
+++ b/docs/content.zh/docs/deployment/advanced/failure_enrichers.md
@@ -0,0 +1,110 @@
+---
+title: "Failure Enrichers"
+nav-title: failure-enrichers
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Custom failure enrichers
+Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string key-value pairs).
+This enables users to implement their own failure enrichment plugins to 
categorize job failures, expose custom metrics, or make calls to external 
notification systems.
+
+FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
+Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's REST API (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+
+
+### Implement a plugin for your custom enricher
+
+To implement a custom FailureEnricher plugin, you need to:
+
+- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+
+- Add your own FailureEnricherFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java"
 name="FailureEnricherFactory" >}} interface.
+
+- Add a service entry. Create a file 
`META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which 
contains the class name of your failure enricher factory class (see [Java 
Service 
Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html)
 docs for more details).
+
+
+Then, create a jar which includes your `FailureEnricher`, 
`FailureEnricherFactory`, `META-INF/services/` and all external dependencies.
+Make a directory in `plugins/` of your Flink distribution with an arbitrary 
name, e.g. "failure-enrichment", and put the jar into this directory.
+See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more 
details.
+
+{{< hint warning >}}
+Note that every FailureEnricher should have defined a set of {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="output keys" >}} that may be associated with values. This set of keys 
has to be unique otherwise all enrichers with overlapping keys will be ignored.
+{{< /hint >}}
+
+FailureEnricherFactory example:
+
+``` java
+package org.apache.flink.test.plugin.jar.failure;
+
+public class TestFailureEnricherFactory implements FailureEnricherFactory {
+
+   @Override
+   public FailureEnricher createFailureEnricher(Configuration conf) {
+return new CustomEnricher();
+   }
+}
+```
+
+FailureEnricher example:
+
+``` java
+package org.apache.flink.test.plugin.jar.failure;
+
+public class CustomEnricher implements FailureEnricher {
+private final Set outputKeys;
+
+public CustomEnricher() {
+this.outputKeys = Collections.singleton("labelKey");
+}
+
+@Override
+public Set getOutputKeys() {
+return outputKeys;
+}
+
+@Override
+public CompletableFuture> processFailure(
+Throwable cause, Context context) {
+return 
CompletableFuture.completedFuture(Collections.singletonMap("labelKey", 
"labelValue"));
+}
+}
+```
+
+### Configuration
+
+The JobManager loads FailureEnricher plugins at startup. To make sure your 
FailureEnrichers are loaded all class names should be defined as part of 
[jobmanager.failure-enrichers configuration]({{< ref 
"docs/deployment/config#jobmanager-failure-enrichers" >}}).
+If this configuration is empty, NO enrichers will be started. Example:
+```
+jobmanager.failure-enrichers = 
org.apache.flink.test.plugin.jar.failure.CustomEnricher
+```
+
+### Validation
+
+To validate that your FailureEnricher is loaded, you can check the JobManager 
logs for the following line:
+```
+Found failure enricher