(flink) branch master updated: [FLINK-33779][table] Cleanup usage of deprecated BaseExpressions#cast

2023-12-15 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ef2b626d671 [FLINK-33779][table] Cleanup usage of deprecated 
BaseExpressions#cast
ef2b626d671 is described below

commit ef2b626d67147797e992ec3b338bafdb4e5ab1c7
Author: Jacky Lau 
AuthorDate: Sat Dec 16 06:59:10 2023 +0800

[FLINK-33779][table] Cleanup usage of deprecated BaseExpressions#cast
---
 .../apache/flink/table/planner/plan/stream/table/AggregateTest.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/AggregateTest.scala
index ade4c8b1f03..829ea0ba9fb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/AggregateTest.scala
@@ -123,7 +123,7 @@ class AggregateTest extends TableTestBase {
 
 val resultTable = table
   .groupBy('b)
-  .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
+  .select('b, 'a.cast(DataTypes.DOUBLE()).avg)
 
 util.verifyExecPlan(resultTable)
   }



(flink-connector-jdbc) branch dependabot/maven/org.apache.commons-commons-compress-1.24.0 created (now f024ecac)

2023-12-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/org.apache.commons-commons-compress-1.24.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


  at f024ecac Bump org.apache.commons:commons-compress from 1.23.0 to 1.24.0

No new revisions were added by this update.



(flink-connector-jdbc) branch main updated (b477d452 -> f8de82b4)

2023-12-15 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


from b477d452 [FLINK-33662] Bump h2 from 2.1.210 to 2.2.224. This closes #69
 add f8de82b4 [FLINK-33787][jdbc] Java 17 support for jdbc connector

No new revisions were added by this update.

Summary of changes:
 .github/workflows/push_pr.yml | 7 ++-
 .github/workflows/weekly.yml  | 3 +++
 flink-connector-jdbc/pom.xml  | 4 
 pom.xml   | 3 +++
 4 files changed, 16 insertions(+), 1 deletion(-)



(flink) branch release-1.17 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 58f8162613d [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
58f8162613d is described below

commit 58f8162613d9f615e60fb0c9e23692d25469d6f0
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 5bbdca9a054..b96422625a8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1397,7 +1397,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1411,7 +1411,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index cb2ff52c6e8..04cf1540bc8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -922,7 +922,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -940,7 +942,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGenerato
 }
 
 if 

(flink) branch release-1.18 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new d6081815000 [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
d6081815000 is described below

commit d60818150005661006a71e4155fc605d7543362b
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e653d1d6463..82197822dc5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1576,7 +1576,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1590,7 +1590,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 9769613cd2d..1a98081dd79 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGenerato
 }
 
 if 

(flink) branch master updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 45f966e8c3c [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
45f966e8c3c is described below

commit 45f966e8c3c5e903b3843391874f7d2478122d8c
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 89ac66d18f6..b65afdc4284 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1644,7 +1644,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1658,7 +1658,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 9769613cd2d..1a98081dd79 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGenerato
 }
 
 if 

(flink) branch master updated: [FLINK-33853][runtime][JUnit5 Migration] Migrate Junit5 for DeclarativeSlotPoolBridge test classes of runtime module (#23932)

2023-12-15 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1136ed50311 [FLINK-33853][runtime][JUnit5 Migration] Migrate Junit5 
for DeclarativeSlotPoolBridge test classes of runtime module (#23932)
1136ed50311 is described below

commit 1136ed50311a18c0b5773ae982330cc2936eba3d
Author: Yuepeng Pan 
AuthorDate: Fri Dec 15 23:15:37 2023 +0800

[FLINK-33853][runtime][JUnit5 Migration] Migrate Junit5 for 
DeclarativeSlotPoolBridge test classes of runtime module (#23932)
---
 ...tiveSlotPoolBridgePreferredAllocationsTest.java |   7 +-
 ...arativeSlotPoolBridgeRequestCompletionTest.java |  25 ++---
 ...ativeSlotPoolBridgeResourceDeclarationTest.java | 103 +
 .../slotpool/DeclarativeSlotPoolBridgeTest.java|  73 +++
 4 files changed, 89 insertions(+), 119 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index e695852375d..1d33f299c5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -28,11 +28,9 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.clock.SystemClock;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nonnull;
 
@@ -44,11 +42,10 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-@ExtendWith(TestLoggerExtension.class)
-public class DeclarativeSlotPoolBridgePreferredAllocationsTest {
+class DeclarativeSlotPoolBridgePreferredAllocationsTest {
 
 @Test
-public void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() 
throws Exception {
+void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws 
Exception {
 final DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
 new DeclarativeSlotPoolBridge(
 new JobID(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
index 40dd0eeabe1..6c8d5701952 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
@@ -28,11 +28,10 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.CheckedSupplier;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -43,27 +42,23 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
-public class DeclarativeSlotPoolBridgeRequestCompletionTest extends TestLogger 
{
+class DeclarativeSlotPoolBridgeRequestCompletionTest {
 
 private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;
 
 private TestingResourceManagerGateway resourceManagerGateway;
 
-@Before
-public void setUp() throws Exception {
+@BeforeEach
+void setUp() {
 resourceManagerGateway = new TestingResourceManagerGateway();
 }
 
 /** Tests that the {@link DeclarativeSlotPoolBridge} completes slots in 
request order. */
 @Test
-public void testRequestsAreCompletedInRequestOrder() {
+

(flink-connector-shared-utils) branch parent_pom updated: [hotfix] set version to 1.1.0 and not 1.0.1 (#31)

2023-12-15 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch parent_pom
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/parent_pom by this push:
 new 2c911d3  [hotfix] set version to 1.1.0 and not 1.0.1 (#31)
2c911d3 is described below

commit 2c911d309dedefa618c9fad1844cf10b048029e3
Author: Etienne Chauchot 
AuthorDate: Fri Dec 15 13:57:11 2023 +0100

[hotfix] set version to 1.1.0 and not 1.0.1 (#31)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 3afa1b3..c36e02d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
 org.apache.flink
 flink-connector-parent
-1.0.1
+1.1.0
 pom
 
 https://flink.apache.org



(flink) 02/02: [FLINK-20772][State] Tests for null value in TtlValueState#update

2023-12-15 Thread hangxiang
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9aa7a3cdbb6bca1746c046dd5536026e43065cec
Author: Zakelly 
AuthorDate: Fri Dec 15 12:37:01 2023 +0800

[FLINK-20772][State] Tests for null value in TtlValueState#update
---
 .../flink/runtime/state/ttl/TtlStateTestBase.java | 19 +++
 .../runtime/state/ttl/TtlValueStateTestContext.java   | 16 
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 5eb5c6c2f8f..bd658dbea44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -187,6 +187,25 @@ public abstract class TtlStateTestBase {
 .isEqualTo(ctx().emptyValue);
 }
 
+@TestTemplate
+void testValueSetNull() throws Exception {
+// Only test this on value state
+assumeThat(ctx()).isInstanceOf(TtlValueStateTestContext.class);
+
+initTest(
+StateTtlConfig.UpdateType.OnCreateAndWrite,
+StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
+
+ctx().update(ctx().updateUnexpired);
+assertThat(ctx().get())
+.withFailMessage(UPDATED_UNEXPIRED_AVAIL)
+.isEqualTo(ctx().getUnexpired);
+
+// Update null and we get empty.
+ctx().update(null);
+
assertThat(ctx().get()).withFailMessage(EXPIRED_UNAVAIL).isEqualTo(ctx().emptyValue);
+}
+
 @TestTemplate
 void testExactExpirationOnWrite() throws Exception {
 initTest(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
index 1e22d404957..1dfdf339852 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
@@ -21,14 +21,14 @@ package org.apache.flink.runtime.state.ttl;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 
 /** Test suite for {@link TtlValueState}. */
 class TtlValueStateTestContext
-extends TtlStateTestContextBase, 
String, String> {
-private static final String TEST_VAL1 = "test value1";
-private static final String TEST_VAL2 = "test value2";
-private static final String TEST_VAL3 = "test value3";
+extends TtlStateTestContextBase, Long, 
Long> {
+private static final Long TEST_VAL1 = 11L;
+private static final Long TEST_VAL2 = 21L;
+private static final Long TEST_VAL3 = 31L;
 
 @Override
 void initTestValues() {
@@ -45,16 +45,16 @@ class TtlValueStateTestContext
 @Override
 public  StateDescriptor 
createStateDescriptor() {
 return (StateDescriptor)
-new ValueStateDescriptor<>(getName(), 
StringSerializer.INSTANCE);
+new ValueStateDescriptor<>(getName(), LongSerializer.INSTANCE);
 }
 
 @Override
-public void update(String value) throws Exception {
+public void update(Long value) throws Exception {
 ttlState.update(value);
 }
 
 @Override
-public String get() throws Exception {
+public Long get() throws Exception {
 return ttlState.value();
 }
 



(flink) branch master updated (20a328d80a1 -> 9aa7a3cdbb6)

2023-12-15 Thread hangxiang
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 20a328d80a1 [FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and 
TemporalJoinJsonPlanITCase.java
 new ac1ed6b60a6 [FLINK-20772][State] Make TtlValueState#update follow the 
description of interface about null values
 new 9aa7a3cdbb6 [FLINK-20772][State] Tests for null value in 
TtlValueState#update

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/state/ttl/TtlValueState.java |  2 +-
 .../flink/runtime/state/ttl/TtlStateTestBase.java | 19 +++
 .../runtime/state/ttl/TtlValueStateTestContext.java   | 16 
 3 files changed, 28 insertions(+), 9 deletions(-)



(flink) 01/02: [FLINK-20772][State] Make TtlValueState#update follow the description of interface about null values

2023-12-15 Thread hangxiang
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac1ed6b60a6358b8a87effc26e2e774c36ed0a37
Author: Zakelly 
AuthorDate: Fri Dec 15 12:36:38 2023 +0800

[FLINK-20772][State] Make TtlValueState#update follow the description of 
interface about null values
---
 .../src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
index 1d68a45d864..344f0b5e949 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -48,7 +48,7 @@ class TtlValueState
 @Override
 public void update(T value) throws IOException {
 accessCallback.run();
-original.update(wrapWithTs(value));
+original.update(value == null ? null : wrapWithTs(value));
 }
 
 @Nullable



(flink) branch master updated (d4a3687aacd -> 20a328d80a1)

2023-12-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from d4a3687aacd [FLIP-321] Update the docs to add migration periods for 
deprecated APIs. (#23865)
 new 46d817d8d29 [FLINK-33767] Implement restore tests for TemporalJoin node
 new 20a328d80a1 [FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and 
TemporalJoinJsonPlanITCase.java

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/test/program/TableTestProgram.java |  22 ++
 ...TestStep.java => TemporalFunctionTestStep.java} |  44 ++--
 .../apache/flink/table/test/program/TestStep.java  |   1 +
 .../exec/stream/TemporalJoinJsonPlanTest.java  | 101 -
 ...storeTest.java => TemporalJoinRestoreTest.java} |  12 +-
 .../exec/stream/TemporalJoinTestPrograms.java  | 103 +
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   3 +
 .../jsonplan/TemporalJoinJsonPlanITCase.java   | 107 --
 .../plan/temporal-join-table-join.json}| 213 +--
 .../temporal-join-table-join/savepoint/_metadata   | Bin 0 -> 14926 bytes
 .../plan/temporal-join-temporal-function.json} | 235 ++---
 .../savepoint/_metadata| Bin 0 -> 14926 bytes
 12 files changed, 450 insertions(+), 391 deletions(-)
 copy 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/{FunctionTestStep.java
 => TemporalFunctionTestStep.java} (58%)
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
 copy 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/{TemporalSortRestoreTest.java
 => TemporalJoinRestoreTest.java} (77%)
 create mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
 delete mode 100644 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out
 => 
restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json}
 (69%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata
 rename 
flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out
 => 
restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json}
 (66%)
 create mode 100644 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata



(flink) 02/02: [FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and TemporalJoinJsonPlanITCase.java

2023-12-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20a328d80a1dbc50974cf3de9f4b6178246f6dee
Author: Jim Hughes 
AuthorDate: Tue Dec 12 14:04:48 2023 -0500

[FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and 
TemporalJoinJsonPlanITCase.java
---
 .../exec/stream/TemporalJoinJsonPlanTest.java  | 101 -
 .../jsonplan/TemporalJoinJsonPlanITCase.java   | 107 --
 .../testJoinTemporalFunction.out   | 421 -
 .../testTemporalTableJoin.json | 421 -
 .../testTemporalTableJoin.out  | 421 -
 5 files changed, 1471 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
deleted file mode 100644
index da3e6eaebae..000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.functions.TemporalTableFunction;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-/** Test json serialization/deserialization for TemporalJoin. */
-class TemporalJoinJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-tEnv.executeSql(
-"CREATE TABLE Orders (\n"
-+ " amount INT,\n"
-+ " currency STRING,\n"
-+ " rowtime TIMESTAMP(3),\n"
-+ " proctime AS PROCTIME(),\n"
-+ " WATERMARK FOR rowtime AS rowtime\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values'\n"
-+ ")");
-tEnv.executeSql(
-"CREATE TABLE RatesHistory (\n"
-+ " currency STRING,\n"
-+ " rate INT,\n"
-+ " rowtime TIMESTAMP(3),\n"
-+ " WATERMARK FOR rowtime AS rowtime,\n"
-+ " PRIMARY KEY(currency) NOT ENFORCED\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values'\n"
-+ ")");
-TemporalTableFunction ratesHistory =
-
tEnv.from("RatesHistory").createTemporalTableFunction($("rowtime"), 
$("currency"));
-tEnv.createTemporarySystemFunction("Rates", ratesHistory);
-}
-
-@Test
-void testJoinTemporalFunction() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ "  a int\n"
-+ ") with (\n"
-+ "  'connector' = 'values',\n"
-+ "  'table-sink-class' = 'DEFAULT')";
-tEnv.executeSql(sinkTableDdl);
-util.verifyJsonPlan(
-"INSERT INTO MySink "
-+ "SELECT amount * r.rate "
-+ "FROM Orders AS o,  "
-+ "LATERAL TABLE (Rates(o.rowtime)) AS r "
-+ "WHERE o.currency = r.currency ");
-}
-
-@Test
-void testTemporalTableJoin() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ "  a int\n"
-+ ") with (\n"
- 

(flink) 01/02: [FLINK-33767] Implement restore tests for TemporalJoin node

2023-12-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46d817d8d297b50fe91b5fb9471bda791a6f4319
Author: Jim Hughes 
AuthorDate: Mon Dec 11 13:26:42 2023 -0500

[FLINK-33767] Implement restore tests for TemporalJoin node

This closes #23916
---
 .../flink/table/test/program/TableTestProgram.java |  22 +
 .../test/program/TemporalFunctionTestStep.java |  67 +++
 .../apache/flink/table/test/program/TestStep.java  |   1 +
 .../nodes/exec/stream/TemporalJoinRestoreTest.java |  40 ++
 .../exec/stream/TemporalJoinTestPrograms.java  | 103 +
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   3 +
 .../testTemporalTableJoin.json | 421 ++
 .../plan/temporal-join-table-join.json | 494 +
 .../temporal-join-table-join/savepoint/_metadata   | Bin 0 -> 14926 bytes
 .../plan/temporal-join-temporal-function.json  | 494 +
 .../savepoint/_metadata| Bin 0 -> 14926 bytes
 11 files changed, 1645 insertions(+)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index f5323b0..7d4c4b45eb5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.test.program;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior;
 import 
org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence;
@@ -176,6 +177,14 @@ public class TableTestProgram {
 .collect(Collectors.toList());
 }
 
+/** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+public List getSetupTemporalFunctionTestSteps() {
+return setupSteps.stream()
+.filter(s -> s.getKind() == TestKind.TEMPORAL_FUNCTION)
+.map(TemporalFunctionTestStep.class::cast)
+.collect(Collectors.toList());
+}
+
 /**
  * Convenience method to avoid boilerplate code. It assumes that only a 
single SQL statement is
  * tested.
@@ -231,6 +240,19 @@ public class TableTestProgram {
 return this;
 }
 
+/** Setup step for registering a temporary system function. */
+public Builder setupTemporarySystemTemporalTableFunction(
+String name, String table, Expression timeAttribute, 
Expression primaryKey) {
+this.setupSteps.add(
+new TemporalFunctionTestStep(
+TemporalFunctionTestStep.FunctionBehavior.SYSTEM,
+name,
+table,
+timeAttribute,
+primaryKey));
+return this;
+}
+
 /** Setup step for registering a temporary catalog function. */
 public Builder setupTemporaryCatalogFunction(
 String name, Class function) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
new file mode 100644
index 000..206f7fa38c1
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import 

(flink-connector-elasticsearch) branch main updated: [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling

2023-12-15 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
 new 40774fa  [FLINK-32028][connectors/elasticsearch] Allow customising 
bulk failure handling
40774fa is described below

commit 40774fad0f4ecd1a0d104dcb339e6bb860b0a4bf
Author: Peter Fischer 
AuthorDate: Fri Dec 1 12:29:32 2023 +0100

[FLINK-32028][connectors/elasticsearch] Allow customising bulk failure 
handling

Extracted `BulkResponseInspector` interface to allow custom handling of 
(partially) failed bulk requests. If not overridden, default behaviour remains 
unchanged and partial failures are escalated.

* fixes https://issues.apache.org/jira/browse/FLINK-32028
* allows custom metrics to be exposed
---
 .../elasticsearch/sink/BulkResponseInspector.java  |  60 +
 .../elasticsearch/sink/ElasticsearchSink.java  |  12 +-
 .../sink/ElasticsearchSinkBuilderBase.java |  63 +-
 .../elasticsearch/sink/ElasticsearchWriter.java| 112 -
 .../elasticsearch/sink/FailureHandler.java |  36 ++
 .../sink/DefaultBulkResponseInspectorTest.java | 127 +++
 .../sink/ElasticsearchSinkBuilderBaseTest.java | 136 +
 .../sink/ElasticsearchWriterITCase.java|   2 +
 8 files changed, 513 insertions(+), 35 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
new file mode 100644
index 000..9f4ce10
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+/** Callback for inspecting a {@link BulkResponse}. */
+@PublicEvolving
+@FunctionalInterface
+public interface BulkResponseInspector {
+
+/**
+ * Callback to inspect a {@code response} in the context of its {@code 
request}. It may throw a
+ * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that 
the bulk failed
+ * (partially).
+ */
+void inspect(BulkRequest request, BulkResponse response);
+
+/**
+ * Factory interface for creating a {@link BulkResponseInspector} in the 
context of a sink.
+ * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to 
capture custom metrics.
+ */
+@PublicEvolving
+@FunctionalInterface
+interface BulkResponseInspectorFactory
+extends SerializableFunction<
+BulkResponseInspectorFactory.InitContext, 
BulkResponseInspector> {
+
+/**
+ * The interface exposes a subset of {@link
+ * org.apache.flink.api.connector.sink2.Sink.InitContext}.
+ */
+interface InitContext {
+
+/** Returns: The metric group of the surrounding writer. */
+MetricGroup metricGroup();
+}
+}
+}
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
index efe6dc2..05ac47a 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import 

(flink) branch master updated: [FLIP-321] Update the docs to add migration periods for deprecated APIs. (#23865)

2023-12-15 Thread jqin
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d4a3687aacd [FLIP-321] Update the docs to add migration periods for 
deprecated APIs. (#23865)
d4a3687aacd is described below

commit d4a3687aacdea61920098dd7814776655fde19db
Author: Jiangjie (Becket) Qin 
AuthorDate: Fri Dec 15 16:11:42 2023 +0800

[FLIP-321] Update the docs to add migration periods for deprecated APIs. 
(#23865)
---
 docs/content/docs/ops/upgrading.md | 38 +++---
 1 file changed, 35 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/ops/upgrading.md 
b/docs/content/docs/ops/upgrading.md
index ce58a7848e5..cc7d5e28cd8 100644
--- a/docs/content/docs/ops/upgrading.md
+++ b/docs/content/docs/ops/upgrading.md
@@ -55,11 +55,43 @@ This table lists the `source` / `binary` compatibility 
guarantees for each annot
 |  `Experimental`  |{{< xmark >}}/{{< xmark >}} |{{< xmark >}}/{{< 
xmark >}} |{{< xmark >}}/{{< xmark >}} |
 
 {{< hint info >}}
-{{< label Example >}}  
-Code written against a `PublicEvolving` API in 1.15.2 will continue to run in 
1.15.3, without having to recompile the code.  
-That same code would have to be recompiled when upgrading to 1.16.0 though.
+{{< label Example >}}
+Consider the code written against a `Public` API in 1.15.2:
+* The code can continue to run when upgrading to Flink 1.15.3 without 
recompiling, because patch version upgrades for `Public` APIs guarantee 
`binary` compatibility.
+* The same code may have to be recompiled when upgrading from 1.15.x to 
1.16.0, because minor version upgrades for `Public` APIs only provide `source` 
compatibility, not `binary` compatibility.
+* Code change may be required when upgrading from 1.x to 2.x because major 
version upgrades for `Public` APIs provide neither `source` nor `binary` 
compatibility.
+
+Consider the code written against a `PublicEvolving` API in 1.15.2:
+* The code can continue to run when upgrading to Flink 1.15.3 without 
recompiling, because patch version upgrades for `PublicEvolving` APIs guarantee 
`binary` compatibility.
+* A code change may be required when upgrading from 1.15.x to Flink 1.16.0, 
because minor version upgrades for `PublicEvolving` APIs provide neither 
`source` nor binary compatibility.
 {{< /hint >}}
 
+### Deprecated API Migration Period
+When an API is deprecated, it is marked with the `@Deprecated` annotation and 
a deprecation message is added to the Javadoc.
+According to 
[FLIP-321](https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process),
 
+starting from release 1.18, each deprecated API will have a guaranteed 
migration period depending on the API stability level:
+
+|Annotation|  Guaranteed Migration Period   |Could be 
removed after the migration period|
+|::|:--:|:-:|
+| `Public` |2 minor releases|  
  Next major version |
+| `PublicEvolving` |1 minor release |  
  Next minor version |
+|  `Experimental`  | 1 patch release for the affected minor release |  
  Next patch version |
+
+The source code of a deprecated API will be kept for at least the guaranteed 
migration period, 
+and may be removed at any point after the migration period has passed.
+
+{{< hint info >}}
+{{< label Example >}}
+Assuming a release sequence of 1.18, 1.19, 1.20, 2.0, 2.1, ..., 3.0,
+- if a `Public` API is deprecated in 1.18, it will not be removed until 2.0.
+- if a `Public` API is deprecated in 1.20, the source code will be kept in 2.0 
because the migration period is 2 minor releases. Also, because a `Public` API 
must maintain source compatibility throughout a major version, the source code 
will be kept for all the 2.x versions and removed in 3.0 at the earliest.
+- if a `PublicEvolving` API is deprecated in 1.18, it will be removed in 1.20 
at the earliest. 
+- if a `PublicEvolving` API is deprecated in 1.20, the source code will be 
kept in 2.0 because the migration period is 1 minor releases. The source code 
may be removed in 2.1 at the earliest.
+- if an `Experimental` API is deprecated in 1.18.0, the source code will be 
kept for 1.18.1 and removed in 1.18.2 at the earliest. Also, the source code 
can be removed in 1.19.0.  
+{{< /hint >}}
+
+Please check the 
[FLIP-321](https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process)
 wiki for more details.
+
 ## Restarting Streaming Applications
 
 The line of action for upgrading a streaming application or migrating an 
application to a different cluster is based on Flink's [Savepoint]({{<