(flink) branch release-1.18 updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 1e1a7f16b6f [FLINK-27741][table-planner] Fix NPE when use dense_rank() 
and rank()
1e1a7f16b6f is described below

commit 1e1a7f16b6f272334d9f9a1053b657148151a789
Author: Sergey Nuyanzin 
AuthorDate: Wed May 15 07:37:47 2024 +0200

[FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

Co-authored-by: Sergey Nuyanzin 

This closes apache#19797

Co-authored-by: chenzihao 
---
 .../aggfunctions/RankLikeAggFunctionBase.java  |  2 +-
 .../planner/plan/utils/AggFunctionFactory.scala| 17 +++---
 .../plan/batch/sql/agg/OverAggregateTest.xml   | 44 
 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 +
 .../runtime/stream/sql/OverAggregateITCase.scala   | 60 ++
 5 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
index 2a556d7b741..898939aedb9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends 
DeclarativeAggregateFuncti
 equalTo(lasValue, operand(i)));
 }
 Optional ret = 
Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and);
-return ret.orElseGet(() -> literal(true));
+return ret.orElseGet(() -> literal(false));
 }
 
 protected Expression generateInitLiteral(LogicalType orderType) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index cbabddae03a..0f1005e51f0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -529,18 +529,23 @@ class AggFunctionFactory(
   }
 
   private def createRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new RankAggFunction(argTypes)
+new RankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createDenseRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new DenseRankAggFunction(argTypes)
+new DenseRankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createPercentRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new PercentRankAggFunction(argTypes)
+new PercentRankAggFunction(getArgTypesOrEmpty())
+  }
+
+  private def getArgTypesOrEmpty(): Array[LogicalType] = {
+if (orderKeyIndexes != null) {
+  orderKeyIndexes.map(inputRowType.getChildren.get(_))
+} else {
+  Array[LogicalType]()
+}
   }
 
   private def createNTILEAggFUnction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
index 909efe170f7..0ca5ec28442 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
@@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN UNBOUNDE
 ]]>
 
   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
   
 
   
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/O

(flink) branch release-1.19 updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new 190522c2c05 [FLINK-27741][table-planner] Fix NPE when use dense_rank() 
and rank()
190522c2c05 is described below

commit 190522c2c051e0ec05213be71fb7a59a517353b1
Author: Sergey Nuyanzin 
AuthorDate: Wed May 15 07:36:10 2024 +0200

[FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

Co-authored-by: Sergey Nuyanzin 

This closes apache#19797

Co-authored-by: chenzihao 
---
 .../aggfunctions/RankLikeAggFunctionBase.java  |  2 +-
 .../planner/plan/utils/AggFunctionFactory.scala| 17 +++---
 .../plan/batch/sql/agg/OverAggregateTest.xml   | 44 
 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 +
 .../runtime/stream/sql/OverAggregateITCase.scala   | 60 ++
 5 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
index 2a556d7b741..898939aedb9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends 
DeclarativeAggregateFuncti
 equalTo(lasValue, operand(i)));
 }
 Optional ret = 
Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and);
-return ret.orElseGet(() -> literal(true));
+return ret.orElseGet(() -> literal(false));
 }
 
 protected Expression generateInitLiteral(LogicalType orderType) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 861f537f6c2..fd44945dccf 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -529,18 +529,23 @@ class AggFunctionFactory(
   }
 
   private def createRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new RankAggFunction(argTypes)
+new RankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createDenseRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new DenseRankAggFunction(argTypes)
+new DenseRankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createPercentRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new PercentRankAggFunction(argTypes)
+new PercentRankAggFunction(getArgTypesOrEmpty())
+  }
+
+  private def getArgTypesOrEmpty(): Array[LogicalType] = {
+if (orderKeyIndexes != null) {
+  orderKeyIndexes.map(inputRowType.getChildren.get(_))
+} else {
+  Array[LogicalType]()
+}
   }
 
   private def createNTILEAggFUnction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
index 909efe170f7..0ca5ec28442 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
@@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN UNBOUNDE
 ]]>
 
   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
   
 
   
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/O

(flink) branch master updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 40fb49dd17b [FLINK-27741][table-planner] Fix NPE when use dense_rank() 
and rank()
40fb49dd17b is described below

commit 40fb49dd17b3e1b6c5aa0249514273730ebe9226
Author: chenzihao 
AuthorDate: Tue May 14 22:18:05 2024 +0200

[FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()

Co-authored-by: Sergey Nuyanzin 

This closes apache#19797
---
 .../aggfunctions/RankLikeAggFunctionBase.java  |  2 +-
 .../planner/plan/utils/AggFunctionFactory.scala| 17 +++---
 .../plan/batch/sql/agg/OverAggregateTest.xml   | 44 
 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 +
 .../runtime/stream/sql/OverAggregateITCase.scala   | 60 ++
 5 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
index 2a556d7b741..898939aedb9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends 
DeclarativeAggregateFuncti
 equalTo(lasValue, operand(i)));
 }
 Optional ret = 
Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and);
-return ret.orElseGet(() -> literal(true));
+return ret.orElseGet(() -> literal(false));
 }
 
 protected Expression generateInitLiteral(LogicalType orderType) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 4ecd4363863..6ca84314fc7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -532,18 +532,23 @@ class AggFunctionFactory(
   }
 
   private def createRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new RankAggFunction(argTypes)
+new RankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createDenseRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new DenseRankAggFunction(argTypes)
+new DenseRankAggFunction(getArgTypesOrEmpty())
   }
 
   private def createPercentRankAggFunction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
-val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
-new PercentRankAggFunction(argTypes)
+new PercentRankAggFunction(getArgTypesOrEmpty())
+  }
+
+  private def getArgTypesOrEmpty(): Array[LogicalType] = {
+if (orderKeyIndexes != null) {
+  orderKeyIndexes.map(inputRowType.getChildren.get(_))
+} else {
+  Array[LogicalType]()
+}
   }
 
   private def createNTILEAggFUnction(argTypes: Array[LogicalType]): 
UserDefinedFunction = {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
index 909efe170f7..0ca5ec28442 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
@@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN UNBOUNDE
 ]]>
 
   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
   
 
   
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala
 
b/flink-table/flink-table-pla

(flink) 01/04: [FLINK-35193][table] Support drop materialized table syntax

2024-05-14 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8551ef39e0387f723a72299cc73aaaf827cf74bf
Author: Feng Jin 
AuthorDate: Mon May 13 20:06:41 2024 +0800

[FLINK-35193][table] Support drop materialized table syntax
---
 .../src/main/codegen/data/Parser.tdd   |  1 +
 .../src/main/codegen/includes/parserImpls.ftl  | 30 ++
 .../sql/parser/ddl/SqlDropMaterializedTable.java   | 68 ++
 .../flink/sql/parser/utils/ParserResource.java |  3 +
 .../MaterializedTableStatementParserTest.java  | 25 
 5 files changed, 127 insertions(+)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 81b3412954c..883b6aec1b2 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -76,6 +76,7 @@
 "org.apache.flink.sql.parser.ddl.SqlDropCatalog"
 "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
 "org.apache.flink.sql.parser.ddl.SqlDropFunction"
+"org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable"
 "org.apache.flink.sql.parser.ddl.SqlDropPartitions"
 
"org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext"
 "org.apache.flink.sql.parser.ddl.SqlDropTable"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index bdc97818914..b2a5ea02d0f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1801,6 +1801,34 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
 }
 }
 
+/**
+* Parses a DROP MATERIALIZED TABLE statement.
+*/
+SqlDrop SqlDropMaterializedTable(Span s, boolean replace, boolean isTemporary) 
:
+{
+SqlIdentifier tableName = null;
+boolean ifExists = false;
+}
+{
+
+ {
+ if (isTemporary) {
+ throw SqlUtil.newContextException(
+ getPos(),
+ 
ParserResource.RESOURCE.dropTemporaryMaterializedTableUnsupported());
+ }
+ }
+ 
+
+ifExists = IfExistsOpt()
+
+tableName = CompoundIdentifier()
+
+{
+return new SqlDropMaterializedTable(s.pos(), tableName, ifExists);
+}
+}
+
 /**
 * Parses alter materialized table.
 */
@@ -2427,6 +2455,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
 (
 drop = SqlDropCatalog(s, replace)
 |
+drop = SqlDropMaterializedTable(s, replace, isTemporary)
+|
 drop = SqlDropTable(s, replace, isTemporary)
 |
 drop = SqlDropView(s, replace, isTemporary)
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java
new file mode 100644
index 000..ec9439fb13a
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/** DROP MATERIALIZED TABLE DDL sql call. */
+public class SqlDropMaterializedTable extends SqlDrop {
+
+private static final SqlOperator OPERATOR =
+new SqlSpecialOperator("DROP MATERIALIZED TABLE", 
SqlKind.DROP_TABLE);
+
+private final SqlIdentifier tableIdentifier;
+
+public SqlDropMaterializedTable(
+ 

(flink) 03/04: [FLINK-35193][table] Support execution of drop materialized table

2024-05-14 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51b744bca1bdf53385152ed237f2950525046488
Author: Feng Jin 
AuthorDate: Mon May 13 20:08:38 2024 +0800

[FLINK-35193][table] Support execution of drop materialized table
---
 .../MaterializedTableManager.java  | 115 +-
 .../service/operation/OperationExecutor.java   |   9 +
 .../service/MaterializedTableStatementITCase.java  | 241 ++---
 .../apache/flink/table/catalog/CatalogManager.java |   4 +-
 4 files changed, 328 insertions(+), 41 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index b4ba12b8755..a51b1885c98 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.gateway.service.materializedtable;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
@@ -34,6 +35,7 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
@@ -93,6 +95,9 @@ public class MaterializedTableManager {
 } else if (op instanceof AlterMaterializedTableResumeOperation) {
 return callAlterMaterializedTableResume(
 operationExecutor, handle, 
(AlterMaterializedTableResumeOperation) op);
+} else if (op instanceof DropMaterializedTableOperation) {
+return callDropMaterializedTableOperation(
+operationExecutor, handle, 
(DropMaterializedTableOperation) op);
 }
 
 throw new SqlExecutionException(
@@ -146,8 +151,7 @@ public class MaterializedTableManager {
 materializedTableIdentifier,
 e);
 operationExecutor.callExecutableOperation(
-handle,
-new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+handle, new 
DropMaterializedTableOperation(materializedTableIdentifier, true));
 throw e;
 }
 }
@@ -170,7 +174,8 @@ public class MaterializedTableManager {
 materializedTable.getSerializedRefreshHandler(),
 
operationExecutor.getSessionContext().getUserClassloader());
 
-String savepointPath = stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler);
+String savepointPath =
+stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler.getJobId());
 
 ContinuousRefreshHandler updateRefreshHandler =
 new ContinuousRefreshHandler(
@@ -183,9 +188,12 @@ public class MaterializedTableManager {
 CatalogMaterializedTable.RefreshStatus.SUSPENDED,
 
materializedTable.getRefreshHandlerDescription().orElse(null),
 serializeContinuousHandler(updateRefreshHandler));
+List tableChanges = new ArrayList<>();
+tableChanges.add(
+
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
 AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
 new AlterMaterializedTableChangeOperation(
-tableIdentifier, Collections.emptyList(), 
updatedMaterializedTable);
+tableIdentifier, tableChanges, 
updatedMaterializedTable);
 
 operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
 
@@ -284,8 +292,7 @@ public class MaterializedTableManager {
 // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
 // atomicity is guaranteed
 

(flink) 04/04: [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check for wrong status

2024-05-14 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94d861b08fef1e350d80a3f5f0f63168d327bc64
Author: Feng Jin 
AuthorDate: Tue May 14 11:18:40 2024 +0800

[FLINK-35342][table] Fix MaterializedTableStatementITCase test can check 
for wrong status
---
 .../service/MaterializedTableStatementITCase.java| 20 +++-
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 105c51ea597..dd7d25e124c 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -272,7 +272,7 @@ public class MaterializedTableStatementITCase {
 waitUntilAllTasksAreRunning(
 restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
 
-// check the background job is running
+// verify the background job is running
 String describeJobDDL = String.format("DESCRIBE JOB '%s'", 
activeRefreshHandler.getJobId());
 OperationHandle describeJobHandle =
 service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
@@ -653,7 +653,7 @@ public class MaterializedTableStatementITCase {
 assertThat(suspendMaterializedTable.getRefreshStatus())
 .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
 
-// check background job is stopped
+// verify background job is stopped
 byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
 ContinuousRefreshHandler suspendRefreshHandler =
 ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
@@ -667,7 +667,7 @@ public class MaterializedTableStatementITCase {
 List jobResults = fetchAllResults(service, sessionHandle, 
describeJobHandle);
 
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED");
 
-// check savepoint is created
+// verify savepoint is created
 assertThat(suspendRefreshHandler.getRestorePath()).isNotEmpty();
 String actualSavepointPath = 
suspendRefreshHandler.getRestorePath().get();
 
@@ -692,7 +692,17 @@ public class MaterializedTableStatementITCase {
 assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
 .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
 
-// check background job is running
+waitUntilAllTasksAreRunning(
+restClusterClient,
+JobID.fromHexString(
+ContinuousRefreshHandlerSerializer.INSTANCE
+.deserialize(
+resumedCatalogMaterializedTable
+.getSerializedRefreshHandler(),
+getClass().getClassLoader())
+.getJobId()));
+
+// verify background job is running
 refreshHandler = 
resumedCatalogMaterializedTable.getSerializedRefreshHandler();
 ContinuousRefreshHandler resumeRefreshHandler =
 ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
@@ -706,7 +716,7 @@ public class MaterializedTableStatementITCase {
 jobResults = fetchAllResults(service, sessionHandle, 
describeResumeJobHandle);
 
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
 
-// check resumed job is restored from savepoint
+// verify resumed job is restored from savepoint
 Optional actualRestorePath =
 getJobRestoreSavepointPath(restClusterClient, resumeJobId);
 assertThat(actualRestorePath).isNotEmpty();



(flink) 02/04: [FLINK-35193][table] Support convert drop materialized table node to operation

2024-05-14 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd333941553c68c36e1460102ab023f80a5b1362
Author: Feng Jin 
AuthorDate: Mon May 13 20:07:39 2024 +0800

[FLINK-35193][table] Support convert drop materialized table node to 
operation
---
 .../DropMaterializedTableOperation.java|  6 ++--
 .../SqlDropMaterializedTableConverter.java | 41 ++
 .../operations/converters/SqlNodeConverters.java   |  1 +
 ...erializedTableNodeToOperationConverterTest.java | 21 +++
 4 files changed, 65 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
index e5eee557bfc..46dd86ad96b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
@@ -33,9 +33,8 @@ import java.util.Map;
 public class DropMaterializedTableOperation extends DropTableOperation
 implements MaterializedTableOperation {
 
-public DropMaterializedTableOperation(
-ObjectIdentifier tableIdentifier, boolean ifExists, boolean 
isTemporary) {
-super(tableIdentifier, ifExists, isTemporary);
+public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, 
boolean ifExists) {
+super(tableIdentifier, ifExists, false);
 }
 
 @Override
@@ -43,7 +42,6 @@ public class DropMaterializedTableOperation extends 
DropTableOperation
 Map params = new LinkedHashMap<>();
 params.put("identifier", getTableIdentifier());
 params.put("IfExists", isIfExists());
-params.put("isTemporary", isTemporary());
 
 return OperationUtils.formatWithChildren(
 "DROP MATERIALIZED TABLE",
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
new file mode 100644
index 000..6501dc0c453
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+
+/** A converter for {@link SqlDropMaterializedTable}. */
+public class SqlDropMaterializedTableConverter
+implements SqlNodeConverter {
+@Override
+public Operation convertSqlNode(
+SqlDropMaterializedTable sqlDropMaterializedTable, ConvertContext 
context) {
+UnresolvedIdentifier unresolvedIdentifier =
+
UnresolvedIdentifier.of(sqlDropMaterializedTable.fullTableName());
+ObjectIdentifier identifier =
+
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+// Currently we don't support temporary materialized table, so 
isTemporary is always false
+return new DropMaterializedTableOperation(
+identifier, sqlDropMaterializedTable.getIfExists());
+}
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java

(flink) branch master updated (65d31e26534 -> 94d861b08fe)

2024-05-14 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 65d31e26534 [FLINK-33986][runtime] Extend ShuffleMaster to support 
snapshot and restore state.
 new 8551ef39e03 [FLINK-35193][table] Support drop materialized table syntax
 new fd333941553 [FLINK-35193][table] Support convert drop materialized 
table node to operation
 new 51b744bca1b [FLINK-35193][table] Support execution of drop 
materialized table
 new 94d861b08fe [FLINK-35342][table] Fix MaterializedTableStatementITCase 
test can check for wrong status

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../MaterializedTableManager.java  | 115 -
 .../service/operation/OperationExecutor.java   |   9 +
 .../service/MaterializedTableStatementITCase.java  | 261 ++---
 .../src/main/codegen/data/Parser.tdd   |   1 +
 .../src/main/codegen/includes/parserImpls.ftl  |  30 +++
 ...pCatalog.java => SqlDropMaterializedTable.java} |  40 ++--
 .../flink/sql/parser/utils/ParserResource.java |   3 +
 .../MaterializedTableStatementParserTest.java  |  25 ++
 .../apache/flink/table/catalog/CatalogManager.java |   4 +-
 .../DropMaterializedTableOperation.java|   6 +-
 ...java => SqlDropMaterializedTableConverter.java} |  20 +-
 .../operations/converters/SqlNodeConverters.java   |   1 +
 ...erializedTableNodeToOperationConverterTest.java |  21 ++
 13 files changed, 455 insertions(+), 81 deletions(-)
 copy 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlDropCatalog.java
 => SqlDropMaterializedTable.java} (68%)
 copy 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/{SqlAlterMaterializedTableSuspendConverter.java
 => SqlDropMaterializedTableConverter.java} (59%)



(flink) branch master updated: [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state.

2024-05-14 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 65d31e26534 [FLINK-33986][runtime] Extend ShuffleMaster to support 
snapshot and restore state.
65d31e26534 is described below

commit 65d31e26534836909f6b8139c6bd6cd45b91bba4
Author: JunRuiLee 
AuthorDate: Fri Feb 2 18:01:03 2024 +0800

[FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore 
state.
---
 .../DefaultShuffleMasterSnapshotContext.java   | 22 +
 .../shuffle/EmptyShuffleMasterSnapshot.java| 37 ++
 .../flink/runtime/shuffle/NettyShuffleMaster.java  | 12 +++
 .../flink/runtime/shuffle/ShuffleMaster.java   | 19 +++
 .../runtime/shuffle/ShuffleMasterSnapshot.java | 31 ++
 .../shuffle/ShuffleMasterSnapshotContext.java  | 22 +
 6 files changed, 143 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java
new file mode 100644
index 000..8fcff36eef1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+/** Default {@link ShuffleMasterSnapshotContext} implementation. */
+public class DefaultShuffleMasterSnapshotContext implements 
ShuffleMasterSnapshotContext {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java
new file mode 100644
index 000..683ccd07968
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+/**
+ * A singleton implementation of {@link ShuffleMasterSnapshot} that represents 
an empty snapshot of
+ * shuffle master.
+ */
+public class EmptyShuffleMasterSnapshot implements ShuffleMasterSnapshot {
+
+private static final EmptyShuffleMasterSnapshot INSTANCE = new 
EmptyShuffleMasterSnapshot();
+
+@Override
+public boolean isIncremental() {
+return false;
+}
+
+public static EmptyShuffleMasterSnapshot getInstance() {
+return INSTANCE;
+}
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 9cce16cf495..461457a29b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -187,4 +187,16 @@ public class NettyShuffleMaster implements 
ShuffleMaster
 public void unregisterJob(JobID jobId) {
 jobShuffleContexts.remove(jobId);
 }
+
+@Override
+public boolean supportsBatchSnapshot() {
+return true;
+}
+
+@Override
+public void snapshotState(
+CompletableFuture 

(flink) branch master updated: [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762)

2024-05-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9a5a99b1a30 [FLINK-32087][checkpoint] Introduce space amplification 
statistics of file merging (#24762)
9a5a99b1a30 is described below

commit 9a5a99b1a30054268bbde36d565cbb1b81018890
Author: Yanfei Lei 
AuthorDate: Tue May 14 16:21:03 2024 +0800

[FLINK-32087][checkpoint] Introduce space amplification statistics of file 
merging (#24762)
---
 .../filemerging/FileMergingSnapshotManager.java| 75 ++
 .../FileMergingSnapshotManagerBase.java| 49 ++
 .../checkpoint/filemerging/LogicalFile.java|  1 +
 .../checkpoint/filemerging/PhysicalFile.java   | 32 +++--
 ...ssCheckpointFileMergingSnapshotManagerTest.java | 30 -
 .../FileMergingSnapshotManagerTestBase.java| 47 ++
 ...inCheckpointFileMergingSnapshotManagerTest.java | 30 -
 ...FileMergingCheckpointStateOutputStreamTest.java |  5 +-
 8 files changed, 247 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index add8806369c..f3523c4430f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 
 import java.io.Closeable;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 /**
@@ -268,4 +269,78 @@ public interface FileMergingSnapshotManager extends 
Closeable {
 "%s-%s(%d/%d)", jobIDString, operatorIDString, 
subtaskIndex, parallelism);
 }
 }
+
+/** Space usage statistics of a managed directory. */
+final class SpaceStat {
+
+AtomicLong physicalFileCount;
+AtomicLong physicalFileSize;
+
+AtomicLong logicalFileCount;
+AtomicLong logicalFileSize;
+
+public SpaceStat() {
+this(0, 0, 0, 0);
+}
+
+public SpaceStat(
+long physicalFileCount,
+long physicalFileSize,
+long logicalFileCount,
+long logicalFileSize) {
+this.physicalFileCount = new AtomicLong(physicalFileCount);
+this.physicalFileSize = new AtomicLong(physicalFileSize);
+this.logicalFileCount = new AtomicLong(logicalFileCount);
+this.logicalFileSize = new AtomicLong(logicalFileSize);
+}
+
+public void onLogicalFileCreate(long size) {
+physicalFileSize.addAndGet(size);
+logicalFileSize.addAndGet(size);
+logicalFileCount.incrementAndGet();
+}
+
+public void onLogicalFileDelete(long size) {
+logicalFileSize.addAndGet(-size);
+logicalFileCount.decrementAndGet();
+}
+
+public void onPhysicalFileCreate() {
+physicalFileCount.incrementAndGet();
+}
+
+public void onPhysicalFileDelete(long size) {
+physicalFileSize.addAndGet(-size);
+physicalFileCount.decrementAndGet();
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+SpaceStat spaceStat = (SpaceStat) o;
+return physicalFileCount.get() == spaceStat.physicalFileCount.get()
+&& physicalFileSize.get() == 
spaceStat.physicalFileSize.get()
+&& logicalFileCount.get() == 
spaceStat.logicalFileCount.get()
+&& logicalFileSize.get() == 
spaceStat.logicalFileSize.get();
+}
+
+@Override
+public String toString() {
+return "SpaceStat{"
++ "physicalFileCount="
++ physicalFileCount.get()
++ ", physicalFileSize="
++ physicalFileSize.get()
++ ", logicalFileCount="
++ logicalFileCount.get()
++ ", logicalFileSize="
++ logicalFileSize.get()
++ '}';
+}
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 1e1abdcaf64..

(flink) branch release-1.18 updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 1f604da2dfc  [FLINK-35098][ORC] Fix incorrect results with literal 
first expressions
1f604da2dfc is described below

commit 1f604da2dfc831d04826a20b3cb272d2ad9dfb56
Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com>
AuthorDate: Tue May 14 14:10:56 2024 +0700

 [FLINK-35098][ORC] Fix incorrect results with literal first expressions
---
 .../main/java/org/apache/flink/orc/OrcFilters.java |  8 +--
 .../apache/flink/orc/OrcFileSystemFilterTest.java  | 58 +++---
 .../org/apache/flink/orc/OrcFileSystemITCase.java  | 11 
 3 files changed, 65 insertions(+), 12 deletions(-)

diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
index 4393356fc30..46eb9af0927 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
@@ -83,28 +83,28 @@ public class OrcFilters {
 convertBinary(
 call,
 
OrcFilters::convertGreaterThan,
-
OrcFilters::convertLessThanEquals))
+
OrcFilters::convertLessThan))
 .put(
 
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertGreaterThanEquals,
-
OrcFilters::convertLessThan))
+
OrcFilters::convertLessThanEquals))
 .put(
 BuiltInFunctionDefinitions.LESS_THAN,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThan,
-
OrcFilters::convertGreaterThanEquals))
+
OrcFilters::convertGreaterThan))
 .put(
 
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThanEquals,
-
OrcFilters::convertGreaterThan))
+
OrcFilters::convertGreaterThanEquals))
 .build();
 
 private static boolean isRef(Expression expression) {
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
index cdee400723d..f5d9ed33423 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
@@ -25,13 +25,13 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit Tests for {@link OrcFileFormatFactory}. */
@@ -39,7 +39,7 @@ class OrcFileSystemFilterTest {
 
 @Test
 @SuppressWarnings("unchecked")
-public void testApplyPredicate() {
+void testApplyPredicate() {
 List args = new ArrayList<>();
 
 // equal
@@ -53,8 +53,7 @@ class OrcFileSystemFilterTest {
 CallExpression.permanent(
 BuiltInFunctionDefinitions.EQUALS, args, 
DataTypes.BOOLEAN());
 OrcFilters.Predicate predicate1 = 
OrcFilters.toOrcPredicate(equalExpression);
-OrcFilters.Predicate predicate2 =
-new OrcFilters.

(flink) branch release-1.19 updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new e16da86dfb1 [FLINK-35098][ORC] Fix incorrect results with literal 
first expressions
e16da86dfb1 is described below

commit e16da86dfb1fbeee541cd9dfccd5f5f4520b7396
Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com>
AuthorDate: Tue May 14 14:10:24 2024 +0700

[FLINK-35098][ORC] Fix incorrect results with literal first expressions
---
 .../main/java/org/apache/flink/orc/OrcFilters.java |  8 +--
 .../apache/flink/orc/OrcFileSystemFilterTest.java  | 58 +++---
 .../org/apache/flink/orc/OrcFileSystemITCase.java  | 11 
 3 files changed, 65 insertions(+), 12 deletions(-)

diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
index 4393356fc30..46eb9af0927 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
@@ -83,28 +83,28 @@ public class OrcFilters {
 convertBinary(
 call,
 
OrcFilters::convertGreaterThan,
-
OrcFilters::convertLessThanEquals))
+
OrcFilters::convertLessThan))
 .put(
 
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertGreaterThanEquals,
-
OrcFilters::convertLessThan))
+
OrcFilters::convertLessThanEquals))
 .put(
 BuiltInFunctionDefinitions.LESS_THAN,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThan,
-
OrcFilters::convertGreaterThanEquals))
+
OrcFilters::convertGreaterThan))
 .put(
 
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThanEquals,
-
OrcFilters::convertGreaterThan))
+
OrcFilters::convertGreaterThanEquals))
 .build();
 
 private static boolean isRef(Expression expression) {
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
index cdee400723d..f5d9ed33423 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
@@ -25,13 +25,13 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit Tests for {@link OrcFileFormatFactory}. */
@@ -39,7 +39,7 @@ class OrcFileSystemFilterTest {
 
 @Test
 @SuppressWarnings("unchecked")
-public void testApplyPredicate() {
+void testApplyPredicate() {
 List args = new ArrayList<>();
 
 // equal
@@ -53,8 +53,7 @@ class OrcFileSystemFilterTest {
 CallExpression.permanent(
 BuiltInFunctionDefinitions.EQUALS, args, 
DataTypes.BOOLEAN());
 OrcFilters.Predicate predicate1 = 
OrcFilters.toOrcPredicate(equalExpression);
-OrcFilters.Predicate predicate2 =
-new OrcFilters.Eq

(flink) branch master updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions

2024-05-14 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4165bac27bd [FLINK-35098][ORC] Fix incorrect results with literal 
first expressions
4165bac27bd is described below

commit 4165bac27bda4457e5940a994d923242d4a271dc
Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com>
AuthorDate: Tue May 14 14:09:51 2024 +0700

[FLINK-35098][ORC] Fix incorrect results with literal first expressions
---
 .../main/java/org/apache/flink/orc/OrcFilters.java |  8 +--
 .../apache/flink/orc/OrcFileSystemFilterTest.java  | 58 +++---
 .../org/apache/flink/orc/OrcFileSystemITCase.java  | 11 
 3 files changed, 65 insertions(+), 12 deletions(-)

diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
index 4393356fc30..46eb9af0927 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
@@ -83,28 +83,28 @@ public class OrcFilters {
 convertBinary(
 call,
 
OrcFilters::convertGreaterThan,
-
OrcFilters::convertLessThanEquals))
+
OrcFilters::convertLessThan))
 .put(
 
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertGreaterThanEquals,
-
OrcFilters::convertLessThan))
+
OrcFilters::convertLessThanEquals))
 .put(
 BuiltInFunctionDefinitions.LESS_THAN,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThan,
-
OrcFilters::convertGreaterThanEquals))
+
OrcFilters::convertGreaterThan))
 .put(
 
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
 call ->
 convertBinary(
 call,
 
OrcFilters::convertLessThanEquals,
-
OrcFilters::convertGreaterThan))
+
OrcFilters::convertGreaterThanEquals))
 .build();
 
 private static boolean isRef(Expression expression) {
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
index cdee400723d..f5d9ed33423 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
@@ -25,13 +25,13 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit Tests for {@link OrcFileFormatFactory}. */
@@ -39,7 +39,7 @@ class OrcFileSystemFilterTest {
 
 @Test
 @SuppressWarnings("unchecked")
-public void testApplyPredicate() {
+void testApplyPredicate() {
 List args = new ArrayList<>();
 
 // equal
@@ -53,8 +53,7 @@ class OrcFileSystemFilterTest {
 CallExpression.permanent(
 BuiltInFunctionDefinitions.EQUALS, args, 
DataTypes.BOOLEAN());
 OrcFilters.Predicate predicate1 = 
OrcFilters.toOrcPredicate(equalExpression);
-OrcFilters.Predicate predicate2 =
-new OrcFilters.Equals("long1"

(flink-connector-kudu) branch main updated: [hotfix] Remove non-existing branches for Weekly tests

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git


The following commit(s) were added to refs/heads/main by this push:
 new 2b82430  [hotfix] Remove non-existing branches for Weekly tests
2b82430 is described below

commit 2b82430280d347193d8ba26a844a3c4344365964
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Tue May 14 09:09:38 2024 +0200

[hotfix] Remove non-existing branches for Weekly tests
---
 .github/workflows/weekly.yml | 18 +++---
 1 file changed, 3 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index f3210c0..7b87f4d 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -30,9 +30,6 @@ jobs:
 strategy:
   matrix:
 flink_branches: [{
-  flink: 1.17-SNAPSHOT,
-  branch: main
-}, {
   flink: 1.18-SNAPSHOT,
   jdk: '8, 11, 17',
   branch: main
@@ -41,18 +38,9 @@ jobs:
   jdk: '8, 11, 17, 21',
   branch: main
 }, {
-  flink: 1.17.2,
-  branch: v3.1
-}, {
-  flink: 1.18.1,
-  jdk: '8, 11, 17',
-  branch: v3.1
-}, {
-  flink: 1.17.2,
-  branch: v3.0
-}, {
-  flink: 1.18.1,
-  branch: v3.0
+  flink: 1.20-SNAPSHOT,
+  jdk: '8, 11, 17, 21',
+  branch: main
 }]
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:



(flink-connector-kudu) branch main updated: [FLINK-34961] Use dedicated CI name for JDBC connector to differentiate it in infra-reports

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git


The following commit(s) were added to refs/heads/main by this push:
 new 4098566  [FLINK-34961] Use dedicated CI name for JDBC connector to 
differentiate it in infra-reports
4098566 is described below

commit 409856698df1bf630c58f6e761d45e4dc8f06ad5
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Tue May 14 09:04:00 2024 +0200

[FLINK-34961] Use dedicated CI name for JDBC connector to differentiate it 
in infra-reports
---
 .github/workflows/push_pr.yml | 5 -
 .github/workflows/weekly.yml  | 5 -
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 72b98a2..da2f077 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -16,7 +16,10 @@
 # limitations under the License.
 

 
-name: CI
+# We need to specify repo related information here since Apache INFRA doesn't 
differentiate
+# between several workflows with the same names while preparing a report for 
GHA usage
+# https://infra-reports.apache.org/#ghactions
+name: Flink Connector Kudu CI
 on: [push, pull_request]
 concurrency:
   group: ${{ github.workflow }}-${{ github.ref }}
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index aaa729f..f3210c0 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -16,7 +16,10 @@
 # limitations under the License.
 

 
-name: Nightly
+# We need to specify repo related information here since Apache INFRA doesn't 
differentiate
+# between several workflows with the same names while preparing a report for 
GHA usage
+# https://infra-reports.apache.org/#ghactions
+name: Weekly Flink Connector Kudu
 on:
   schedule:
 - cron: "0 0 * * 0"



(flink-connector-kudu) 40/44: [FLINK-34930] Fix KuduTableSourceITCase

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit abc4a3b965e54406eaddcf06261116916440c4c5
Author: Ferenc Csaky 
AuthorDate: Thu Mar 28 15:05:46 2024 +0100

[FLINK-34930] Fix KuduTableSourceITCase
---
 .../connector/kudu/table/KuduTableSourceITCase.java  | 20 
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java
index 2468e77..7971bda 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -36,9 +37,11 @@ public class KuduTableSourceITCase extends KuduTestBase {
 private TableEnvironment tableEnv;
 private KuduCatalog catalog;
 
+private KuduTableInfo tableInfo = null;
+
 @BeforeEach
-public void init() {
-KuduTableInfo tableInfo = booksTableInfo("books", true);
+void init() {
+tableInfo = booksTableInfo("books", true);
 setUpDatabase(tableInfo);
 tableEnv = 
KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode();
 catalog = new KuduCatalog(getMasterAddress());
@@ -46,6 +49,14 @@ public class KuduTableSourceITCase extends KuduTestBase {
 tableEnv.useCatalog("kudu");
 }
 
+@AfterEach
+void cleanup() {
+if (tableInfo != null) {
+cleanDatabase(tableInfo);
+tableInfo = null;
+}
+}
+
 @Test
 void testFullBatchScan() throws Exception {
 CloseableIterator it =
@@ -53,7 +64,8 @@ public class KuduTableSourceITCase extends KuduTestBase {
 List results = new ArrayList<>();
 it.forEachRemaining(results::add);
 assertEquals(5, results.size());
-assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", 
results.get(0).toString());
+assertEquals(
+"+I[1001, Java for dummies, Tan Ah Teck, 11.11, 11]", 
results.get(0).toString());
 tableEnv.executeSql("DROP TABLE books");
 }
 
@@ -68,7 +80,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
 List results = new ArrayList<>();
 it.forEachRemaining(results::add);
 assertEquals(1, results.size());
-assertEquals("More Java for more dummies", results.get(0).toString());
+assertEquals("+I[More Java for more dummies]", 
results.get(0).toString());
 tableEnv.executeSql("DROP TABLE books");
 }
 }



(flink-connector-kudu) 25/44: BAHIR-296: Unify mockito version to 1.10.19

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 1dce82988ac057da3ca3ee5dcb67df227cd86284
Author: Joao Boto 
AuthorDate: Mon Dec 27 16:36:32 2021 +0100

BAHIR-296: Unify mockito version to 1.10.19
---
 flink-connector-kudu/pom.xml | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 12fd9da..750929c 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -31,7 +31,6 @@
 
   
 1.13.0
-1.10.19
   
 
   
@@ -97,8 +96,6 @@
 
   org.mockito
   mockito-all
-  ${mockito.version}
-  test
 
 
 
@@ -119,7 +116,7 @@
   ${log4j.version}
   test
 
-
+  
 
   
 



(flink-connector-kudu) 41/44: [FLINK-34930] State Bahir fork

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 9253b9d7b59abd34c7f0c87e6ad5673984ed03c9
Author: Ferenc Csaky 
AuthorDate: Tue Apr 2 16:57:16 2024 +0200

[FLINK-34930] State Bahir fork
---
 README.md | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 1944eca..477aa84 100644
--- a/README.md
+++ b/README.md
@@ -2,6 +2,10 @@
 
 This repository contains the official Apache Flink Kudu connector.
 
+## Forked from Bahir
+
+The connector code is forked from [Apache Bahir](https://bahir.apache.org/) 
project after its retirement. 
+
 ## Apache Flink
 
 Apache Flink is an open source stream processing framework with powerful 
stream- and batch-processing capabilities.
@@ -65,4 +69,4 @@ This article describes [how to contribute to Apache 
Flink](https://flink.apache.
 ## About
 
 Apache Flink is an open source project of The Apache Software Foundation (ASF).
-The Apache Flink project originated from the 
[Stratosphere](http://stratosphere.eu) research project.
\ No newline at end of file
+The Apache Flink project originated from the 
[Stratosphere](http://stratosphere.eu) research project.



(flink-connector-kudu) 42/44: [FLINK-34930] Skip spotless for JDK 21+

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit c7dfb67801ced03fe028e5f738fc2ced670440b4
Author: Ferenc Csaky 
AuthorDate: Mon Apr 8 13:40:05 2024 +0200

[FLINK-34930] Skip spotless for JDK 21+
---
 pom.xml | 25 +
 1 file changed, 25 insertions(+)

diff --git a/pom.xml b/pom.xml
index 6d8617a..69e2d74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -321,4 +321,29 @@ under the License.



+
+   
+   
+   java21
+   
+   [21,)
+   
+   
+   
+   
+   
+   
com.diffplug.spotless
+   
spotless-maven-plugin
+   
+   
+   
true
+   
+   
+   
+   
+   
+   
+   
 



(flink-connector-kudu) 17/44: Add batch table env support and filter push down to Kudu connector (#82)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 6e8b66f16b25802f5111f7b70a20e7d7a1ca5d65
Author: Sebastian Liu 
AuthorDate: Tue Jul 14 11:25:44 2020 +0800

Add batch table env support and filter push down to Kudu connector (#82)

Update the KuduTableSource to inherit from InputFormatTableSource
in order to support both streaming SQL and Batch SQL at the same time.
In order to reduce unnecessary data transmission, the filter push down
was also added to the KuduTableSource.
---
 flink-connector-kudu/README.md |   2 +-
 flink-connector-kudu/pom.xml   |   2 +-
 .../connectors/kudu/connector/KuduFilterInfo.java  |  14 +-
 .../flink/connectors/kudu/table/KuduCatalog.java   |   5 +-
 .../connectors/kudu/table/KuduCatalogFactory.java  |   4 +-
 .../connectors/kudu/table/KuduTableFactory.java|   2 +-
 .../connectors/kudu/table/KuduTableSource.java |  96 +--
 .../kudu/table/utils/KuduTableUtils.java   | 142 
 .../connectors/kudu/table/KuduCatalogTest.java |   2 +-
 .../kudu/table/KuduTableFactoryTest.java   |   2 +-
 .../kudu/table/KuduTableSourceITCase.java  |  67 
 .../connectors/kudu/table/KuduTableSourceTest.java | 181 +
 .../connectors/kudu/table/KuduTableTestUtils.java  |  10 +-
 13 files changed, 496 insertions(+), 33 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 14c13eb..6370aa6 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -184,7 +184,7 @@ are described as being nullable, and not being primary keys.
 
 ## DataStream API
 
-It is also possible to use the the Kudu connector directly from the DataStream 
API however we
+It is also possible to use the Kudu connector directly from the DataStream API 
however we
 encourage all users to explore the Table API as it provides a lot of useful 
tooling when working
 with Kudu data.
 
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index fe7887c..3bbefee 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -52,7 +52,7 @@
 
 
   org.apache.flink
-  flink-table-planner-blink_2.11
+  
flink-table-planner-blink_${scala.binary.version}
   ${flink.version}
   provided
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 0a89cad..08fa86b 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -68,25 +68,25 @@ public class KuduFilterInfo implements Serializable {
 predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (String) this.value);
 break;
 case FLOAT:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (float) this.value);
 break;
 case INT8:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (byte) this.value);
 break;
 case INT16:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (short) this.value);
 break;
 case INT32:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (int) this.value);
 break;
 case INT64:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (long) this.value);
 break;
 case DOUBLE:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (double) this.value);
 break;
 case BOOL:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (boolean) this.value);
 break;
 case UNIXTIME_MICROS:

(flink-connector-kudu) 43/44: [FLINK-34930] Enable module opens for tests for newer JDKs

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 5230ec106551307d83d3998fbe0d08d35fc7d2f8
Author: Ferenc Csaky 
AuthorDate: Thu Apr 11 14:42:02 2024 +0200

[FLINK-34930] Enable module opens for tests for newer JDKs
---
 pom.xml | 4 
 1 file changed, 4 insertions(+)

diff --git a/pom.xml b/pom.xml
index 69e2d74..d1548f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,10 @@ under the License.
 


flink-connector-kudu-parent
+   
+   --add-opens=java.base/java.lang=ALL-UNNAMED
+   --add-opens=java.base/java.util=ALL-UNNAMED
+   

 




(flink-connector-kudu) 44/44: [FLINK-34930] Migrate Bahir NOTICE

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 8674446dc0f1bab67528347ebfefc206bd380b6e
Author: Ferenc Csaky 
AuthorDate: Wed Apr 17 11:26:04 2024 +0200

[FLINK-34930] Migrate Bahir NOTICE
---
 NOTICE | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/NOTICE b/NOTICE
index c1e8320..c3ce37e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
-Apache Flink Kudu Connector
-Copyright 2014-2024 The Apache Software Foundation
+Apache Bahir
+Copyright (c) 2016-2024 The Apache Software Foundation.
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
@@ -12,5 +12,3 @@ ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO 
EVENT SHALL THE AUT
 DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 
FROM LOSS OF USE, DATA OR PROFITS,
 WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING 
OUT OF OR IN CONNECTION WITH THE
 USE OR PERFORMANCE OF THIS SOFTWARE.
-
-



(flink-connector-kudu) 34/44: [BAHIR-308] Bump flink version to 1.15.3

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 6ff53e7c4d40f3986983439f35083407693026b5
Author: Joao Boto 
AuthorDate: Wed Mar 8 17:11:35 2023 +0100

[BAHIR-308] Bump flink version to 1.15.3
---
 flink-connector-kudu/pom.xml   | 16 ++---
 .../kudu/connector/ColumnSchemasFactory.java   |  1 -
 .../kudu/connector/CreateTableOptionsFactory.java  |  1 -
 .../connectors/kudu/connector/KuduFilterInfo.java  |  1 -
 .../connectors/kudu/connector/KuduTableInfo.java   |  3 +-
 .../convertor/RowResultRowDataConvertor.java   |  6 +---
 .../kudu/connector/failure/KuduFailureHandler.java |  1 -
 .../kudu/connector/reader/KuduReader.java  | 10 ++
 .../kudu/connector/reader/KuduReaderConfig.java|  3 +-
 .../writer/AbstractSingleOperationMapper.java  |  1 -
 .../kudu/connector/writer/KuduOperationMapper.java |  1 -
 .../kudu/connector/writer/KuduWriter.java  |  9 +
 .../kudu/connector/writer/KuduWriterConfig.java|  3 +-
 .../kudu/connector/writer/PojoOperationMapper.java |  6 +---
 .../connectors/kudu/format/KuduOutputFormat.java   |  1 -
 .../flink/connectors/kudu/streaming/KuduSink.java  |  1 -
 .../kudu/table/AbstractReadOnlyCatalog.java| 22 ++--
 .../flink/connectors/kudu/table/KuduCatalog.java   | 35 ---
 .../connectors/kudu/table/KuduTableFactory.java| 28 +++
 .../connectors/kudu/table/KuduTableSource.java | 13 ++-
 .../kudu/table/UpsertOperationMapper.java  |  1 -
 .../kudu/table/dynamic/KuduDynamicTableSource.java | 40 +++---
 .../table/dynamic/catalog/KuduDynamicCatalog.java  | 32 +++--
 .../kudu/table/utils/KuduTableUtils.java   | 12 ++-
 .../connectors/kudu/table/utils/KuduTypeUtils.java | 14 +---
 .../connectors/kudu/connector/KuduTestBase.java| 19 --
 .../kudu/format/KuduOutputFormatTest.java  |  3 +-
 .../connectors/kudu/streaming/KuduSinkTest.java|  1 -
 .../connectors/kudu/table/KuduCatalogTest.java |  1 -
 .../kudu/table/KuduTableFactoryTest.java   | 11 ++
 .../kudu/table/KuduTableSourceITCase.java  |  4 +--
 .../connectors/kudu/table/KuduTableSourceTest.java | 18 ++
 .../connectors/kudu/table/KuduTableTestUtils.java  |  4 +--
 .../kudu/writer/AbstractOperationTest.java |  8 +
 .../kudu/writer/PojoOperationMapperTest.java   |  3 +-
 .../kudu/writer/RowOperationMapperTest.java|  1 -
 .../kudu/writer/TupleOpertaionMapperTest.java  |  1 -
 37 files changed, 78 insertions(+), 257 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 20b16b4..134d6f7 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -36,13 +36,6 @@
 
   
 
-  
-org.apache.kudu
-kudu-binary
-${kudu.version}
-${os.detected.classifier}
-test
-  
   
 org.apache.kudu
 kudu-client
@@ -85,11 +78,11 @@
   
 
   org.apache.flink
-  flink-clients_${scala.binary.version}
+  flink-clients
 
 
   org.apache.flink
-  flink-streaming-java_${scala.binary.version}
+  flink-streaming-java
 
 
   org.apache.flink
@@ -99,11 +92,6 @@
   org.apache.flink
   flink-table-planner_${scala.binary.version}
 
-
-  org.apache.kudu
-  kudu-binary
-  ${os.detected.classifier}
-
 
   org.apache.kudu
   kudu-client
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
index b178308..4997938 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.ColumnSchema;
 
 import java.io.Serializable;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
index 4a475e9..fd9bfa4 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.CreateTableOptions;
 

(flink-connector-kudu) 18/44: BAHIR-240: replace docker test by testcontainer (#89)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 3f9db639b956585ba4e2a7bc99557646d1fd53ff
Author: Joao Boto 
AuthorDate: Tue Jul 28 17:41:33 2020 +0200

BAHIR-240: replace docker test by testcontainer (#89)
---
 flink-connector-kudu/pom.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 3bbefee..bbf168e 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -32,7 +32,6 @@
   
 1.11.1
 1.10.19
-!DockerTest
   
 
   



(flink-connector-kudu) 06/44: [BAHIR-180] Improve eventual consistence for Kudu connector

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 247dfe99486f04622b4ee7d83fa51ef253aaa75d
Author: eskabetxe 
AuthorDate: Tue Jan 15 13:05:29 2019 +0100

[BAHIR-180] Improve eventual consistence for Kudu connector

Closes #40
---
 flink-connector-kudu/pom.xml   |   2 +-
 .../streaming/connectors/kudu/KuduInputFormat.java |  27 ++---
 .../connectors/kudu/KuduOutputFormat.java  |  36 +++---
 .../flink/streaming/connectors/kudu/KuduSink.java  |  34 +++---
 .../connectors/kudu/connector/KuduColumnInfo.java  |  50 
 .../connectors/kudu/connector/KuduConnector.java   |  81 -
 .../connectors/kudu/connector/KuduMapper.java  |  59 +
 .../connectors/kudu/connector/KuduRow.java |  72 ++-
 .../connectors/kudu/connector/KuduRowIterator.java |  57 +
 .../connectors/kudu/serde/DefaultSerDe.java|  39 ++
 .../connectors/kudu/serde/KuduDeserialization.java |  25 
 .../connectors/kudu/serde/KuduSerialization.java   |  28 +
 .../streaming/connectors/kudu/serde/PojoSerDe.java | 134 +
 .../connectors/kudu/KuduOuputFormatTest.java   |  11 +-
 .../streaming/connectors/kudu/KuduSinkTest.java|  11 +-
 .../connectors/kudu/connector/KuduDatabase.java|   2 +-
 .../connectors/kudu/serde/PojoSerDeTest.java   |  57 +
 17 files changed, 543 insertions(+), 182 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 5540fc1..61ab4a6 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   jar
 
   
-1.8.0
+1.7.1
 
 !DockerTest
   
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
index 617e317..fd126d0 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
@@ -24,7 +24,9 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.streaming.connectors.kudu.connector.*;
 import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.*;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.LocatedTablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,8 +45,7 @@ public class KuduInputFormat extends RichInputFormat implements 
OutputFormat {
+public class KuduOutputFormat extends RichOutputFormat {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KuduOutputFormat.class);
 
@@ -36,10 +37,12 @@ public class KuduOutputFormat 
implements OutputFormat
 private KuduConnector.Consistency consistency;
 private KuduConnector.WriteMode writeMode;
 
-private transient KuduConnector tableContext;
+private KuduSerialization serializer;
 
+private transient KuduConnector connector;
 
-public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+
+public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, 
KuduSerialization serializer) {
 Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be 
null");
 this.kuduMasters = kuduMasters;
 
@@ -47,8 +50,10 @@ public class KuduOutputFormat 
implements OutputFormat
 this.tableInfo = tableInfo;
 this.consistency = KuduConnector.Consistency.STRONG;
 this.writeMode = KuduConnector.WriteMode.UPSERT;
+this.serializer = serializer.withSchema(tableInfo.getSchema());
 }
 
+
 public KuduOutputFormat withEventualConsistency() {
 this.consistency = KuduConnector.Consistency.EVENTUAL;
 return this;
@@ -81,28 +86,31 @@ public class KuduOutputFormat 
implements OutputFormat
 
 @Override
 public void open(int taskNumber, int numTasks) throws IOException {
-startTableContext();
-}
-
-private void startTableContext() throws IOException {
-if (tableContext != null) return;
-tableContext = new KuduConnector(kuduMasters, tableInfo);
+if (connector != null) return;
+connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode);
+serializer = serializer.withSchema(tableInfo.getSchema());
 }
 
 @Override
-public void writeRecord(OUT kuduRow) throws IOException {
+public void writeRecord(OUT row) throws IOException {
+boolean response;
 try {
-tableContext.writeRow(kuduRow, consistency, writeMode);
+KuduRow kuduRow = serializer.serialize(row);
+response = connecto

(flink-connector-kudu) 11/44: [BAHIR-207] Add tests for scala 2.12 on travis (#59)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 47133a3e4873c0a4295ed4c002d805fbd3304408
Author: Joao Boto 
AuthorDate: Thu Jul 4 00:13:51 2019 +0200

[BAHIR-207] Add tests for scala 2.12 on travis (#59)
---
 .../apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java | 6 +++---
 .../org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java| 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
index b9aaa40..4e91310 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -45,7 +45,7 @@ public class KuduOuputFormatTest extends KuduDatabase {
 public void testNotTableExist() throws IOException {
 String masterAddresses = harness.getMasterAddressesAsString();
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
+KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
 Assertions.assertThrows(UnsupportedOperationException.class, () -> 
outputFormat.open(0,1));
 }
 
@@ -54,7 +54,7 @@ public class KuduOuputFormatTest extends KuduDatabase {
 String masterAddresses = harness.getMasterAddressesAsString();
 
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
+KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
 .withStrongConsistency();
 outputFormat.open(0,1);
 
@@ -74,7 +74,7 @@ public class KuduOuputFormatTest extends KuduDatabase {
 String masterAddresses = harness.getMasterAddressesAsString();
 
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
+KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
 .withEventualConsistency();
 outputFormat.open(0,1);
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
index 83e060d..225bf7c 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -58,7 +58,7 @@ public class KuduSinkTest extends KuduDatabase {
 public void testNotTableExist() throws IOException {
 String masterAddresses = harness.getMasterAddressesAsString();
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new 
DefaultSerDe());
+KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, 
new DefaultSerDe());
 sink.setRuntimeContext(context);
 Assertions.assertThrows(UnsupportedOperationException.class, () -> 
sink.open(new Configuration()));
 }
@@ -68,7 +68,7 @@ public class KuduSinkTest extends KuduDatabase {
 String masterAddresses = harness.getMasterAddressesAsString();
 
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new 
DefaultSerDe())
+KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, 
new DefaultSerDe())
 .withStrongConsistency();
 sink.setRuntimeContext(context);
 sink.open(new Configuration());
@@ -88,7 +88,7 @@ public class KuduSinkTest extends KuduDatabase {
 String masterAddresses = harness.getMasterAddressesAsString();
 
 KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new 
DefaultSerDe())
+KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, 
new DefaultSerDe())
 .withEventualConsistency();
 sink.setRuntimeContext(context);
 sink.open(new Configuration());



(flink-connector-kudu) 37/44: [FLINK-34930] Adapt POM files to the new structure, remove flink-shaded Guava usage

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 033554484423caa9ff1fbd830dc3b15b066b8463
Author: Ferenc Csaky 
AuthorDate: Thu Mar 28 12:30:27 2024 +0100

[FLINK-34930] Adapt POM files to the new structure, remove flink-shaded 
Guava usage
---
 flink-connector-kudu/pom.xml   | 237 ++-
 .../flink/connectors/kudu/table/KuduCatalog.java   |   3 +-
 .../connectors/kudu/table/KuduTableSource.java |   5 +-
 .../function/lookup/KuduRowDataLookupFunction.java |   4 +-
 .../kudu/table/utils/KuduTableUtils.java   |   3 +-
 pom.xml| 324 +
 6 files changed, 423 insertions(+), 153 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 8dc4f88..22817ff 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -1,148 +1,97 @@
 
 
-http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  4.0.0
-
-  
-org.apache.bahir
-bahir-flink-parent
-1.2-SNAPSHOT
-../pom.xml
-  
-
-  flink-connector-kudu
-  jar
-
-  flink-connector-kudu
-
-  
-1.13.0
-  
-
-  
-
-  
-org.apache.kudu
-kudu-client
-${kudu.version}
-  
-  
-org.apache.kudu
-kudu-test-utils
-${kudu.version}
-test
-  
-  
-org.apache.logging.log4j
-log4j-api
-${log4j.version}
-test
-  
-  
-org.apache.logging.log4j
-log4j-core
-${log4j.version}
-test
-  
-  
-org.apache.logging.log4j
-log4j-slf4j-impl
-${log4j.version}
-test
-  
-  
-  
-org.junit.jupiter
-junit-jupiter-migrationsupport
-${junit.jupiter.version}
-test
-  
-
-  
-
-  
-
-  org.apache.flink
-  flink-clients
-
-
-  org.apache.flink
-  flink-streaming-java
-
-
-  org.apache.flink
-  flink-table-api-java-bridge
-
-
-  org.apache.flink
-  flink-table-common
-
-
-  org.apache.flink
-  flink-table-planner-loader
-
-
-  org.apache.flink
-  flink-table-runtime
-
-
-  org.apache.kudu
-  kudu-client
-
-
-  org.apache.kudu
-  kudu-test-utils
-
-
-  org.apache.logging.log4j
-  log4j-api
-
-
-  org.apache.logging.log4j
-  log4j-core
-
-
-  org.apache.logging.log4j
-  log4j-slf4j-impl
-
-
-
-  org.junit.jupiter
-  junit-jupiter-migrationsupport
-
-
-  org.mockito
-  mockito-all
-
-
-  org.testcontainers
-  testcontainers
-
-  
-
-  
-
-  
-kr.motd.maven
-os-maven-plugin
-1.6.2
-  
-
-  
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-kudu-parent
+   2.0-SNAPSHOT
+   
+
+   flink-connector-kudu
+   Flink : Connectors : Kudu
+   jar
+
+   
+   
+   org.apache.flink
+   flink-clients
+   
+
+   
+   org.apache.flink
+   flink-streaming-java
+   
+
+   
+   org.apache.flink
+   flink-table-api-java-bridge
+   
+
+   
+   org.apache.flink
+   flink-table-common
+   
+
+   
+   org.apache.flink
+   flink-table-planner-loader
+   
+
+   
+   org.apache.

(flink-connector-kudu) 39/44: [FLINK-34930] Rename base package

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 34cb67fd62aefc03ed67ee75731ef2a2c82b1fff
Author: Ferenc Csaky 
AuthorDate: Wed Apr 10 16:27:19 2024 +0200

[FLINK-34930] Rename base package
---
 .../kudu/connector/ColumnSchemasFactory.java   |  2 +-
 .../kudu/connector/CreateTableOptionsFactory.java  |  2 +-
 .../kudu/connector/KuduFilterInfo.java |  2 +-
 .../kudu/connector/KuduTableInfo.java  |  2 +-
 .../connector/converter/RowResultConverter.java|  2 +-
 .../connector/converter/RowResultRowConverter.java |  2 +-
 .../converter/RowResultRowDataConverter.java   |  2 +-
 .../failure/DefaultKuduFailureHandler.java |  2 +-
 .../kudu/connector/failure/KuduFailureHandler.java |  5 ++-
 .../kudu/connector/reader/KuduInputSplit.java  |  2 +-
 .../kudu/connector/reader/KuduReader.java  |  8 ++--
 .../kudu/connector/reader/KuduReaderConfig.java|  7 ++--
 .../kudu/connector/reader/KuduReaderIterator.java  |  4 +-
 .../writer/AbstractSingleOperationMapper.java  |  2 +-
 .../kudu/connector/writer/KuduOperationMapper.java |  2 +-
 .../kudu/connector/writer/KuduWriter.java  |  8 ++--
 .../kudu/connector/writer/KuduWriterConfig.java|  9 +++--
 .../kudu/connector/writer/PojoOperationMapper.java |  2 +-
 .../writer/RowDataUpsertOperationMapper.java   |  2 +-
 .../kudu/connector/writer/RowOperationMapper.java  |  2 +-
 .../connector/writer/TupleOperationMapper.java |  2 +-
 .../kudu/format/AbstractKuduInputFormat.java   | 18 -
 .../kudu/format/KuduOutputFormat.java  | 14 +++
 .../kudu/format/KuduRowDataInputFormat.java| 10 ++---
 .../kudu/format/KuduRowInputFormat.java| 10 ++---
 .../kudu/streaming/KuduSink.java   | 14 +++
 .../kudu/table/AbstractReadOnlyCatalog.java|  2 +-
 .../kudu/table/KuduCatalog.java| 25 ++---
 .../kudu/table/KuduTableFactory.java   | 10 ++---
 .../kudu/table/KuduTableSink.java  |  8 ++--
 .../kudu/table/KuduTableSource.java| 17 -
 .../kudu/table/UpsertOperationMapper.java  |  4 +-
 .../kudu/table/dynamic/KuduDynamicTableSink.java   | 10 ++---
 .../kudu/table/dynamic/KuduDynamicTableSource.java | 20 +-
 .../dynamic/KuduDynamicTableSourceSinkFactory.java | 12 +++---
 .../table/dynamic/catalog/KuduCatalogFactory.java  |  6 +--
 .../table/dynamic/catalog/KuduDynamicCatalog.java  | 18 -
 .../table/function/lookup/KuduLookupOptions.java   |  2 +-
 .../function/lookup/KuduRowDataLookupFunction.java | 20 +-
 .../kudu/table/utils/KuduTableUtils.java   | 43 +-
 .../kudu/table/utils/KuduTypeUtils.java|  2 +-
 .../org.apache.flink.table.factories.Factory   |  2 +-
 .../org.apache.flink.table.factories.TableFactory  |  4 +-
 .../kudu/connector/KuduFilterInfoTest.java |  2 +-
 .../kudu/connector/KuduTestBase.java   | 22 +--
 .../kudu/format/KuduOutputFormatTest.java  | 12 +++---
 .../kudu/format/KuduRowDataInputFormatTest.java| 16 
 .../kudu/format/KuduRowInputFormatTest.java| 12 +++---
 .../kudu/streaming/KuduSinkTest.java   | 12 +++---
 .../kudu/table/KuduCatalogTest.java| 14 +++
 .../kudu/table/KuduTableFactoryTest.java   |  8 ++--
 .../kudu/table/KuduTableSourceITCase.java  |  6 +--
 .../kudu/table/KuduTableSourceTest.java|  6 +--
 .../kudu/table/KuduTableTestUtils.java |  2 +-
 .../kudu/table/dynamic/KuduDynamicSinkTest.java|  6 +--
 .../kudu/table/dynamic/KuduDynamicSourceTest.java  |  6 +--
 .../dynamic/KuduRowDataLookupFunctionTest.java | 12 +++---
 .../kudu/writer/AbstractOperationTest.java |  4 +-
 .../kudu/writer/PojoOperationMapperTest.java   | 15 
 .../writer/RowDataUpsertOperationMapperTest.java   |  6 +--
 .../kudu/writer/RowOperationMapperTest.java|  8 ++--
 .../kudu/writer/TupleOperationMapperTest.java  |  8 ++--
 62 files changed, 264 insertions(+), 255 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFactory.java
similarity index 96%
rename from 
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
rename to 
flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFactory.java
index 194af17..95fe97d 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFa

(flink-connector-kudu) 28/44: [BAHIR-302] Group declaration of flink dependencies on parent pom

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 285a6e133745d462fb1773d203026594a4cce802
Author: Joao Boto 
AuthorDate: Mon May 2 20:00:15 2022 +0200

[BAHIR-302] Group declaration of flink dependencies on parent pom
---
 flink-connector-kudu/pom.xml | 24 
 1 file changed, 24 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 9ace382..6f68522 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -36,30 +36,6 @@
 
   
 
-  
-org.apache.flink
-flink-clients_${scala.binary.version}
-${flink.version}
-test
-  
-  
-org.apache.flink
-flink-streaming-java_${scala.binary.version}
-${flink.version}
-provided
-  
-  
-org.apache.flink
-flink-table-common
-${flink.version}
-provided
-  
-  
-org.apache.flink
-flink-table-planner_${scala.binary.version}
-${flink.version}
-provided
-  
   
 org.apache.kudu
 kudu-binary



(flink-connector-kudu) 35/44: [BAHIR-308] Remove scala prefix where we can

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 78c76c544ab426461e9e4156ebb9ddf561b7b97c
Author: Joao Boto 
AuthorDate: Thu Mar 23 13:30:24 2023 +0100

[BAHIR-308] Remove scala prefix where we can
---
 flink-connector-kudu/pom.xml   | 14 +++---
 .../kudu/table/dynamic/KuduDynamicTableSource.java |  3 +--
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 134d6f7..8dc4f88 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -20,12 +20,12 @@
 
   
 org.apache.bahir
-bahir-flink-parent_2.12
+bahir-flink-parent
 1.2-SNAPSHOT
 ../pom.xml
   
 
-  flink-connector-kudu_2.12
+  flink-connector-kudu
   jar
 
   flink-connector-kudu
@@ -84,13 +84,21 @@
   org.apache.flink
   flink-streaming-java
 
+
+  org.apache.flink
+  flink-table-api-java-bridge
+
 
   org.apache.flink
   flink-table-common
 
 
   org.apache.flink
-  flink-table-planner_${scala.binary.version}
+  flink-table-planner-loader
+
+
+  org.apache.flink
+  flink-table-runtime
 
 
   org.apache.kudu
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
index cde6a13..da6bebb 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-import static 
org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly;
 
 /**
@@ -143,7 +142,7 @@ public class KuduDynamicTableSource implements 
ScanTableSource, SupportsProjecti
 }
 
 private TableSchema projectSchema(TableSchema tableSchema, int[][] 
projectedFields) {
-checkArgument(
+Preconditions.checkArgument(
 containsPhysicalColumnsOnly(tableSchema),
 "Projection is only supported for physical columns.");
 TableSchema.Builder builder = TableSchema.builder();



(flink-connector-kudu) 20/44: [BAHIR-263] Update flink to 1.12.2 (#115)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 4a5ec773266d76568d89b5f291861307f64810fb
Author: Joao Boto 
AuthorDate: Fri Mar 12 08:53:06 2021 +0100

[BAHIR-263] Update flink to 1.12.2 (#115)
---
 .../connectors/kudu/table/KuduCatalogTest.java | 16 ++---
 .../kudu/table/KuduTableFactoryTest.java   | 12 +-
 .../connectors/kudu/table/KuduTableSourceTest.java |  2 +-
 .../src/test/resources/log4j.properties| 27 ++
 4 files changed, 42 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 4bb1871..2bc8b12 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -77,7 +77,7 @@ public class KuduCatalogTest extends KuduTestBase {
 tableEnv.executeSql("INSERT INTO TestTable1 VALUES ('f', 's')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 // Add this once Primary key support has been enabled
@@ -101,7 +101,7 @@ public class KuduCatalogTest extends KuduTestBase {
 tableEnv.executeSql("INSERT INTO TestTable3 VALUES ('f', 2, 't')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 validateMultiKey("TestTable3");
@@ -113,14 +113,14 @@ public class KuduCatalogTest extends KuduTestBase {
 tableEnv.executeSql("INSERT INTO TestTable5 VALUES ('s', 'f', 't')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 tableEnv.executeSql("CREATE TABLE TestTable6 (`first` STRING, `second` 
String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 
'first')");
 tableEnv.executeSql("INSERT INTO TestTable6 (SELECT `first`, `second` 
FROM  TestTable5)")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 validateSingleKey("TestTable6");
@@ -133,12 +133,12 @@ public class KuduCatalogTest extends KuduTestBase {
 tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f','s')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f2','s2')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
@@ -225,7 +225,7 @@ public class KuduCatalogTest extends KuduTestBase {
 tableEnv.executeSql("INSERT INTO TestTableTsC values ('f', TIMESTAMP 
'2020-01-01 12:12:12.123456')")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
@@ -252,7 +252,7 @@ public class KuduCatalogTest extends KuduTestBase {
 "TIMESTAMP '2020-04-15 12:34:56.123') ")
 .getJobClient()
 .get()
-.getJobExecutionResult(getClass().getClassLoader())
+.getJobExecutionResult()
 .get(1, TimeUnit.MINUTES);
 
 validateManyTypes("TestTable8");
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index d852f8e..d4de7f6 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -64,7 +64,7 @@ public class KuduTableFactoryTest extends KuduTestBase {
 "WITH ('connector.t

(flink-connector-kudu) 05/44: [BAHIR-194] bump kudu version to 1.8.0

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit f0cd1d9243abb110ce312a82adb106afe233e078
Author: Joao Boto 
AuthorDate: Sun Feb 10 23:38:43 2019 +0100

[BAHIR-194] bump kudu version to 1.8.0

Closes 44
---
 flink-connector-kudu/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 61ab4a6..5540fc1 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   jar
 
   
-1.7.1
+1.8.0
 
 !DockerTest
   



(flink-connector-kudu) 36/44: [BAHIR-324] Closing KuduReader at JobManager

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit d8c803af8921d4c26865143f5ea3976f43a163f2
Author: Shimin Huang <40719512+coll...@users.noreply.github.com>
AuthorDate: Wed May 31 00:37:14 2023 +0800

[BAHIR-324] Closing KuduReader at JobManager
---
 .../kudu/format/AbstractKuduInputFormat.java   | 22 +++---
 .../function/lookup/KuduRowDataLookupFunction.java |  2 +-
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
index 0cdf570..4976241 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
@@ -104,19 +104,23 @@ public abstract class AbstractKuduInputFormat extends 
RichInputFormat extends 
RichInputFormat {
 ArrayList rows = new ArrayList<>();
 for (KuduInputSplit inputSplit : inputSplits) {
 KuduReaderIterator scanner = 
kuduReader.scanner(inputSplit.getScanToken());
-// 没有启用cache
+// not use cache
 if (cache == null) {
 while (scanner.hasNext()) {
 collect(scanner.next());



(flink-connector-kudu) 19/44: [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 7b5d717b609ca68cd8e812773585c2c295946619
Author: Gyula Fora 
AuthorDate: Thu Sep 24 20:16:10 2020 +0200

[BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)

Co-authored-by: Gyula Fora 
---
 flink-connector-kudu/pom.xml   | 32 ---
 .../kudu/connector/reader/KuduReader.java  |  2 +-
 .../kudu/connector/writer/KuduWriter.java  |  2 +-
 .../connectors/kudu/table/KuduTableFactory.java| 55 +++-
 .../flink/connectors/kudu/table/KuduTableSink.java |  3 -
 ...utFormatTest.java => KuduOutputFormatTest.java} |  4 +-
 .../connectors/kudu/streaming/KuduSinkTest.java|  2 +-
 .../connectors/kudu/table/KuduCatalogTest.java | 98 +-
 .../kudu/table/KuduTableFactoryTest.java   | 89 +++-
 .../kudu/table/KuduTableSourceITCase.java  | 16 ++--
 .../connectors/kudu/table/KuduTableSourceTest.java |  5 +-
 .../connectors/kudu/table/KuduTableTestUtils.java  |  2 +-
 .../{log4j.properties => log4j2-test.properties}   | 17 ++--
 13 files changed, 170 insertions(+), 157 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index bbf168e..a76102e 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   jar
 
   
-1.11.1
+1.13.0
 1.10.19
   
 
@@ -79,6 +79,13 @@
   test
 
 
+
+org.apache.flink
+flink-clients_${scala.binary.version}
+${flink.version}
+test
+
+
 
 
   org.junit.jupiter
@@ -95,17 +102,22 @@
 
 
 
-  org.slf4j
-  slf4j-log4j12
-  ${slf4j.version}
-  runtime
+  org.apache.logging.log4j
+  log4j-api
+  ${log4j2.version}
+   test
 
-
 
-  log4j
-  log4j
-  ${log4j.version}
-  runtime
+  org.apache.logging.log4j
+  log4j-core
+  ${log4j2.version}
+   test
+
+
+  org.apache.logging.log4j
+  log4j-slf4j-impl
+  ${log4j2.version}
+  test
 
 
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 51ab748..d7a0c61 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -82,7 +82,7 @@ public class KuduReader implements AutoCloseable {
 if (tableInfo.getCreateTableIfNotExists()) {
 return client.createTable(tableName, tableInfo.getSchema(), 
tableInfo.getCreateTableOptions());
 }
-throw new UnsupportedOperationException("table not exists and is 
marketed to not be created");
+throw new RuntimeException("Table " + tableName + " does not exist.");
 }
 
 public KuduReaderIterator scanner(byte[] token) throws IOException {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 7233478..03c37ea 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -83,7 +83,7 @@ public class KuduWriter implements AutoCloseable {
 if (tableInfo.getCreateTableIfNotExists()) {
 return client.createTable(tableName, tableInfo.getSchema(), 
tableInfo.getCreateTableOptions());
 }
-throw new UnsupportedOperationException("table not exists and is 
marketed to not be created");
+throw new RuntimeException("Table " + tableName + " does not exist.");
 }
 
 public void write(T input) throws IOException {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index eb72205..1961aad 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.types.Row;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,25 +37,10 @@ import java.util.

(flink-connector-kudu) 32/44: [BAHIR-321] Make KuduFilterInfo handle String literals

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 2e75a60111bd48d670e94b8b6fc1b2eb52fd70fe
Author: Balazs Varga 
AuthorDate: Tue Jan 3 18:07:55 2023 +0100

[BAHIR-321] Make KuduFilterInfo handle String literals
---
 .../connectors/kudu/connector/KuduFilterInfo.java  |  4 +-
 .../function/lookup/KuduRowDataLookupFunction.java |  8 +--
 .../kudu/connector/KuduFilterInfoTest.java | 41 
 .../connectors/kudu/connector/KuduTestBase.java|  2 +-
 .../kudu/table/dynamic/KuduDynamicSourceTest.java  | 57 ++
 5 files changed, 107 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 08fa86b..e7a8d16 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -17,6 +17,7 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.binary.BinaryStringData;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -65,7 +66,8 @@ public class KuduFilterInfo implements Serializable {
 
 switch (column.getType()) {
 case STRING:
-predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (String) this.value);
+predicate = KuduPredicate.newComparisonPredicate(column, 
comparison,
+(this.value instanceof BinaryStringData) ? 
this.value.toString() : (String) this.value);
 break;
 case FLOAT:
 predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (float) this.value);
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index 4a4a952..d54f23a 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -178,9 +178,11 @@ public class KuduRowDataLookupFunction extends 
TableFunction {
 if (null != this.kuduReader) {
 try {
 this.kuduReader.close();
-this.cache.cleanUp();
-// help gc
-this.cache = null;
+if (cache != null) {
+this.cache.cleanUp();
+// help gc
+this.cache = null;
+}
 this.kuduReader = null;
 } catch (IOException e) {
 // ignore exception when close.
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
new file mode 100644
index 000..a6c2ff7
--- /dev/null
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class KuduFilterInfoTest {
+
+@Test
+void testKuduFilterInfoWithBinaryStringData() {
+String filterValue = "someValue";
+
+KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create("col")
+.equalTo(BinaryStringData.fromStri

(flink-connector-kudu) 33/44: [BAHIR-308] Remove support for scala 2.11

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 5a1da3182cbf2c2afc0800a9da750c031a9915b2
Author: Joao Boto 
AuthorDate: Wed Jul 6 16:42:06 2022 +0200

[BAHIR-308] Remove support for scala 2.11
---
 flink-connector-kudu/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index fd485d5..20b16b4 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -20,12 +20,12 @@
 
   
 org.apache.bahir
-bahir-flink-parent_2.11
+bahir-flink-parent_2.12
 1.2-SNAPSHOT
 ../pom.xml
   
 
-  flink-connector-kudu_2.11
+  flink-connector-kudu_2.12
   jar
 
   flink-connector-kudu



(flink-connector-kudu) 26/44: [BAHIR-302] Add enforcers

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit e9c2a761dc5461e7fe229d837daf412a3e25fd99
Author: Joao Boto 
AuthorDate: Mon Mar 28 17:18:05 2022 +0200

[BAHIR-302] Add enforcers
---
 flink-connector-kudu/pom.xml | 143 ---
 1 file changed, 92 insertions(+), 51 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 750929c..2a2dde2 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -22,99 +22,140 @@
 org.apache.bahir
 bahir-flink-parent_2.11
 1.1-SNAPSHOT
-..
+../pom.xml
   
 
   flink-connector-kudu_2.11
-  flink-connector-kudu
   jar
 
+  flink-connector-kudu
+
   
 1.13.0
   
 
-  
-
-
-  org.apache.kudu
-  kudu-client
-  ${kudu.version}
-
+  
+
+  
+org.apache.flink
+flink-clients_${scala.binary.version}
+${flink.version}
+test
+  
+  
+org.apache.flink
+flink-streaming-java_${scala.binary.version}
+${flink.version}
+provided
+  
+  
+org.apache.flink
+flink-table-common
+${flink.version}
+provided
+  
+  
+org.apache.flink
+flink-table-planner_${scala.binary.version}
+${flink.version}
+provided
+  
+  
+org.apache.kudu
+kudu-binary
+${kudu.version}
+${os.detected.classifier}
+test
+  
+  
+org.apache.kudu
+kudu-client
+${kudu.version}
+  
+  
+org.apache.kudu
+kudu-test-utils
+${kudu.version}
+test
+  
+  
+org.apache.logging.log4j
+log4j-api
+${log4j.version}
+test
+  
+  
+org.apache.logging.log4j
+log4j-core
+${log4j.version}
+test
+  
+  
+org.apache.logging.log4j
+log4j-slf4j-impl
+${log4j.version}
+test
+  
+  
+  
+org.junit.jupiter
+junit-jupiter-migrationsupport
+${junit.jupiter.version}
+test
+  
+
+  
 
+  
 
   org.apache.flink
-  flink-streaming-java_${scala.binary.version}
-  ${flink.version}
-  provided
+  flink-clients_${scala.binary.version}
 
-
 
   org.apache.flink
-  flink-table-planner_${scala.binary.version}
-  ${flink.version}
-  provided
+  flink-streaming-java_${scala.binary.version}
 
-
 
   org.apache.flink
   flink-table-common
-  ${flink.version}
-  provided
 
-
-
 
-  org.apache.kudu
-  kudu-test-utils
-  ${kudu.version}
-  test
+  org.apache.flink
+  flink-table-planner_${scala.binary.version}
 
-
 
   org.apache.kudu
   kudu-binary
-  ${kudu.version}
   ${os.detected.classifier}
-  test
-
-
-
-org.apache.flink
-flink-clients_${scala.binary.version}
-${flink.version}
-test
 
-
-
 
-  org.junit.jupiter
-  junit-jupiter-migrationsupport
-  ${junit.jupiter.version}
-  test
+  org.apache.kudu
+  kudu-client
 
-
 
-  org.mockito
-  mockito-all
+  org.apache.kudu
+  kudu-test-utils
 
-
 
   org.apache.logging.log4j
   log4j-api
-  ${log4j.version}
-   test
 
 
   org.apache.logging.log4j
   log4j-core
-  ${log4j.version}
-   test
 
 
   org.apache.logging.log4j
   log4j-slf4j-impl
-  ${log4j.version}
-  test
+
+
+
+  org.junit.jupiter
+  junit-jupiter-migrationsupport
+
+
+  org.mockito
+  mockito-all
 
   
 



(flink-connector-kudu) 21/44: [BAHIR-260] Add kudu table writer config (#109)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 65004752884890ee49dbffd5bfb52a2a863758ed
Author: hackergin 
AuthorDate: Wed Apr 7 06:35:07 2021 -0500

[BAHIR-260] Add kudu table writer config  (#109)
---
 .../connectors/kudu/connector/KuduTableInfo.java   |  18 
 .../kudu/connector/writer/KuduWriter.java  |   5 +
 .../kudu/connector/writer/KuduWriterConfig.java| 113 -
 .../connectors/kudu/table/KuduTableFactory.java|  49 -
 .../flink/connectors/kudu/table/KuduTableSink.java |  21 
 .../kudu/table/KuduTableFactoryTest.java   |  44 +++-
 6 files changed, 243 insertions(+), 7 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index 83c7dde..baae8a0 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -23,6 +23,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Describes which table should be used in sources and sinks along with 
specifications
@@ -103,4 +104,21 @@ public class KuduTableInfo implements Serializable {
 }
 return createTableOptionsFactory.getCreateTableOptions();
 }
+
+@Override
+public int hashCode() {
+return Objects.hash(name);
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+KuduTableInfo that = (KuduTableInfo) o;
+return Objects.equals(this.name, that.name);
+}
 }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 03c37ea..59ad196 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -72,6 +72,11 @@ public class KuduWriter implements AutoCloseable {
 private KuduSession obtainSession() {
 KuduSession session = client.newSession();
 session.setFlushMode(writerConfig.getFlushMode());
+session.setTimeoutMillis(writerConfig.getOperationTimeout());
+session.setMutationBufferSpace(writerConfig.getMaxBufferSize());
+session.setFlushInterval(writerConfig.getFlushInterval());
+session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate());
+session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound());
 return session;
 }
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 598f8d0..ff93921 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -19,8 +19,10 @@ package org.apache.flink.connectors.kudu.connector.writer;
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.kudu.client.AsyncKuduClient;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.kudu.client.SessionConfiguration.FlushMode;
@@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable {
 
 private final String masters;
 private final FlushMode flushMode;
+private final long operationTimeout;
+private int maxBufferSize;
+private int flushInterval;
+private boolean ignoreNotFound;
+private boolean ignoreDuplicate;
 
 private KuduWriterConfig(
 String masters,
-FlushMode flushMode) {
+FlushMode flushMode,
+long operationTimeout,
+int maxBufferSize,
+int flushInterval,
+boolean ignoreNotFound,
+boolean ignoreDuplicate) {
 
 this.masters = checkNotNull(masters, "Kudu masters cannot be null");
 this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be 
null");
+this.operationTimeout = operationTimeout;
+this.maxBufferSize = max

(flink-connector-kudu) 12/44: [BAHIR-214] Improve speed and solve eventual consistence issues (#64)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 73e87f9c2e9d3c5ef59bbe723a930b2cf5cffc1d
Author: Joao Boto 
AuthorDate: Tue Sep 3 02:05:56 2019 +0200

[BAHIR-214] Improve speed and solve eventual consistence issues (#64)

* resolve eventual consistency issues
* improve speed special on eventual consistency stream
* Update Readme
---
 flink-connector-kudu/README.md |  60 +++---
 .../connectors/kudu/batch/KuduInputFormat.java | 137 +
 .../connectors/kudu/batch/KuduOutputFormat.java|  93 +
 .../connectors/kudu/connector/KuduColumnInfo.java  |   4 +-
 .../connectors/kudu/connector/KuduFilterInfo.java  |   5 +-
 .../connectors/kudu/connector/KuduRow.java |   4 +-
 .../connectors/kudu/connector/KuduTableInfo.java   |   4 +-
 .../failure/DefaultKuduFailureHandler.java |  33 
 .../kudu/connector/failure/KuduFailureHandler.java |  37 
 .../kudu/connector/reader/KuduInputSplit.java  |  39 
 .../kudu/connector/reader/KuduReader.java  | 170 +
 .../kudu/connector/reader/KuduReaderConfig.java|  82 
 .../kudu/connector/reader/KuduReaderIterator.java  | 112 +++
 .../kudu/connector}/serde/DefaultSerDe.java|   4 +-
 .../kudu/connector}/serde/KuduDeserialization.java |   4 +-
 .../kudu/connector}/serde/KuduSerialization.java   |   4 +-
 .../kudu/connector}/serde/PojoSerDe.java   |   6 +-
 .../kudu/connector/writer/KuduWriter.java  | 209 
 .../kudu/connector/writer/KuduWriterConfig.java| 113 +++
 .../connector/writer/KuduWriterConsistency.java|  32 
 .../kudu/connector/writer/KuduWriterMode.java} |  18 +-
 .../flink/connectors/kudu/streaming/KuduSink.java  |  89 +
 .../streaming/connectors/kudu/KuduInputFormat.java | 211 -
 .../connectors/kudu/KuduOutputFormat.java  | 121 
 .../flink/streaming/connectors/kudu/KuduSink.java  | 157 ---
 .../connectors/kudu/connector/KuduConnector.java   | 170 -
 .../connectors/kudu/connector/KuduMapper.java  | 143 --
 .../connectors/kudu/connector/KuduRowIterator.java |  57 --
 .../kudu/batch}/KuduInputFormatTest.java   |  41 ++--
 .../kudu/batch}/KuduOuputFormatTest.java   |  50 +++--
 .../connectors/kudu/connector/KuduDatabase.java|  48 -
 .../kudu/connector}/serde/PojoSerDeTest.java   |   8 +-
 .../connectors/kudu/streaming/KuduSinkTest.java| 159 
 .../streaming/connectors/kudu/KuduSinkTest.java| 109 ---
 34 files changed, 1467 insertions(+), 1066 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 9b75aa7..8692ca5 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -29,15 +29,19 @@ env.setParallelism(PARALLELISM);
 // create a table info object
 KuduTableInfo tableInfo = KuduTableInfo.Builder
 .create("books")
-.addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
-.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-.addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
-.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
-.addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).build())
+
.addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+.addColumn(KuduColumnInfo.Builder.createString("title").build())
+.addColumn(KuduColumnInfo.Builder.createString("author").build())
+.addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+.addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
 .build();
-
+// create a reader configuration
+KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+.setMasters("172.25.0.6")
+.setRowLimit(1000)
+.build();
 // Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new 
DefaultSerDe()))
 .count();
 
 env.execute();
@@ -54,18 +58,23 @@ env.setParallelism(PARALLELISM);
 KuduTableInfo tableInfo = KuduTableInfo.Builder
 .create("books")
 .createIfNotExist(true)
-.replicas(1)
-.addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
-.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-.addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
-.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()

(flink-connector-kudu) 31/44: [BAHIR-312] Add license header to README.md files

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit bef51ab2af1db28f82c4bea5864fd23202550bed
Author: Joao Boto 
AuthorDate: Sat Jul 30 17:45:39 2022 +0200

[BAHIR-312] Add license header to README.md files
---
 flink-connector-kudu/README.md | 16 
 1 file changed, 16 insertions(+)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index c52adb1..7908e08 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -1,3 +1,19 @@
+ 
+
 # Flink Kudu Connector
 
 This connector provides a source (```KuduInputFormat```), a sink/output



(flink-connector-kudu) 27/44: [BAHIR-243] Change KuduTestHarness with TestContainers

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 6832f05fd4b73ef3583e24e9968ccbd0faa4
Author: Joao Boto 
AuthorDate: Thu Mar 11 19:03:43 2021 +0100

[BAHIR-243] Change KuduTestHarness with TestContainers
---
 flink-connector-kudu/pom.xml   |  4 +
 .../connectors/kudu/batch/KuduInputFormatTest.java |  4 +-
 .../kudu/batch/KuduOutputFormatTest.java   |  8 +-
 .../connectors/kudu/connector/KuduTestBase.java| 85 +-
 .../connectors/kudu/streaming/KuduSinkTest.java| 11 ++-
 .../connectors/kudu/table/KuduCatalogTest.java | 20 ++---
 .../kudu/table/KuduTableFactoryTest.java   | 10 +--
 .../kudu/table/KuduTableSourceITCase.java  |  2 +-
 .../connectors/kudu/table/KuduTableSourceTest.java |  2 +-
 9 files changed, 99 insertions(+), 47 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 2a2dde2..9ace382 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -157,6 +157,10 @@
   org.mockito
   mockito-all
 
+
+  org.testcontainers
+  testcontainers
+
   
 
   
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index fb7d8e3..126f7fd 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -39,7 +39,7 @@ class KuduInputFormatTest extends KuduTestBase {
 
 @Test
 void testInvalidTableInfo() {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();
 KuduReaderConfig readerConfig = 
KuduReaderConfig.Builder.setMasters(masterAddresses).build();
 Assertions.assertThrows(NullPointerException.class, () -> new 
KuduRowInputFormat(readerConfig, null));
 }
@@ -71,7 +71,7 @@ class KuduInputFormatTest extends KuduTestBase {
 }
 
 private List readRows(KuduTableInfo tableInfo, String... 
fieldProjection) throws Exception {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();
 KuduReaderConfig readerConfig = 
KuduReaderConfig.Builder.setMasters(masterAddresses).build();
 KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, 
tableInfo, new ArrayList<>(),
 fieldProjection == null ? null : 
Arrays.asList(fieldProjection));
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
index 22fa0a4..693e113 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
@@ -39,14 +39,14 @@ class KuduOutputFormatTest extends KuduTestBase {
 
 @Test
 void testInvalidTableInfo() {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();
 KuduWriterConfig writerConfig = 
KuduWriterConfig.Builder.setMasters(masterAddresses).build();
 Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(writerConfig, null, null));
 }
 
 @Test
 void testNotTableExist() {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();
 KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), 
false);
 KuduWriterConfig writerConfig = 
KuduWriterConfig.Builder.setMasters(masterAddresses).build();
 KuduOutputFormat outputFormat = new 
KuduOutputFormat<>(writerConfig, tableInfo, new 
RowOperationMapper(KuduTestBase.columns, 
AbstractSingleOperationMapper.KuduOperation.INSERT));
@@ -55,7 +55,7 @@ class KuduOutputFormatTest extends KuduTestBase {
 
 @Test
 void testOutputWithStrongConsistency() throws Exception {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();
 
 KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), 
true);
 KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -80,7 +80,7 @@ class KuduOutputFormatTest extends KuduTestBase {
 
 @Test
 void testOutputWithEventualConsistency() throws Exception {
-String masterAddresses = harness.getMasterAddressesAsString();
+String masterAddresses = getMasterAddress();

(flink-connector-kudu) 01/44: [FLINK-34930] Initialize tools/ and vcs.xml

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 427098db43f40fde797efbeb16723eb4e7cef233
Author: Ferenc Csaky 
AuthorDate: Tue Mar 26 15:21:33 2024 +0100

[FLINK-34930] Initialize tools/ and vcs.xml
---
 .gitmodules  |   4 +
 .idea/vcs.xml|  25 ++
 tools/ci/log4j.properties|  43 
 tools/maven/checkstyle.xml   | 562 +++
 tools/maven/suppressions.xml |  26 ++
 5 files changed, 660 insertions(+)

diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 000..e5d40f3
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "tools/releasing/shared"]
+   path = tools/releasing/shared
+   url = https://github.com/apache/flink-connector-shared-utils
+   branch = release_utils
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000..31d1734
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,25 @@
+
+
+  
+
+  
+
+  
+  https://issues.apache.org/jira/browse/$0"; />
+
+
+  
+  https://cwiki.apache.org/confluence/display/FLINK/$0"; />
+
+
+  
+  https://github.com/apache/flink-connector-kudu/pull/$1"; />
+
+  
+
+  
+  
+
+
+  
+
diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties
new file mode 100644
index 000..7daf1c3
--- /dev/null
+++ b/tools/ci/log4j.properties
@@ -0,0 +1,43 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = ConsoleAppender
+
+# -
+# Console (use 'console')
+# -
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
+
+# -
+# File (use 'file')
+# -
+appender.file.name = FileAppender
+appender.file.type = FILE
+appender.file.fileName = ${sys:log.dir}/mvn-${sys:mvn.forkNumber:-output}.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
+appender.file.createOnDemand = true
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = ERROR
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
new file mode 100644
index 000..2048fd1
--- /dev/null
+++ b/tools/maven/checkstyle.xml
@@ -0,0 +1,562 @@
+
+
+http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+
+
+
+
+
+   
+   
+   
+   
+
+   
+   
+   
+   
+   
+   
+
+   
+   
+   
+   
+   
+
+   
+   
+   
+   
+   
+
+   
+   
+   
+   
+
+   
+   
+   
+
+   
+   
+
+   
+
+   
+   
+   
+
+   
+   
+
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+   
+   
+   
+   
+   
+
+   
+   
+   
+   
+   
+
+   
+   
+   
+   
+   
+
+   
+
+   
+   
+   
+  

(flink-connector-kudu) 10/44: [BAHIR-209] upgrade kudu version to 1.10.0 (#61)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 02deb5127a163c32e75d2c3ed23614c0934f109e
Author: Joao Boto 
AuthorDate: Thu Jul 4 00:11:57 2019 +0200

[BAHIR-209] upgrade kudu version to 1.10.0 (#61)
---
 flink-connector-kudu/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index a504b89..bd42c55 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   jar
 
   
-1.9.0
+1.10.0
 
 1.10.19
 !DockerTest



(flink-connector-kudu) branch main updated (d837674 -> 8674446)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git


from d837674  Initial commit
 new 427098d  [FLINK-34930] Initialize tools/ and vcs.xml
 new 663ffd1  [FLINK-34930] Remove Python CI
 new 37a3e55  [BAHIR-99] kudu connector
 new 701fabe  [BAHIR-179] Fail Docker integration tests silently
 new f0cd1d9  [BAHIR-194] bump kudu version to 1.8.0
 new 247dfe9  [BAHIR-180] Improve eventual consistence for Kudu connector
 new 1e26505  [BAHIR-199] Bump kudu version to 1.9.0
 new f9c75bb  [BAHIR-202] Improve KuduSink throughput using async FlushMode
 new a1b4da5  [BAHIR-200] Move tests from docker to kudu-test-utils (#49)
 new 02deb51  [BAHIR-209] upgrade kudu version to 1.10.0 (#61)
 new 47133a3  [BAHIR-207] Add tests for scala 2.12 on travis (#59)
 new 73e87f9  [BAHIR-214] Improve speed and solve eventual consistence 
issues (#64)
 new c380e40  Add support "upsert part of columns of a kudu table" (#70)
 new 9b99c18  Fix "the client has already been closed" bug (#75)
 new 6da6ad7  Fix NotSerializableException in Kudu connector (#74)
 new 3c57f2e  Kudu Connector rework (#78)
 new 6e8b66f  Add batch table env support and filter push down to Kudu 
connector (#82)
 new 3f9db63  BAHIR-240: replace docker test by testcontainer (#89)
 new 7b5d717  [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
 new 4a5ec77  [BAHIR-263] Update flink to 1.12.2 (#115)
 new 6500475  [BAHIR-260] Add kudu table writer config  (#109)
 new 03f0312  [BAHIR-293] Fix documentation tables
 new b9cd23e  [BAHIR-291] Bump flink to 1.14.0 (#136)
 new 7700edb  [BAHIR-296] Unify log4j libs
 new 1dce829  BAHIR-296: Unify mockito version to 1.10.19
 new e9c2a76  [BAHIR-302] Add enforcers
 new 6832f05  [BAHIR-243] Change KuduTestHarness with TestContainers
 new 285a6e1  [BAHIR-302] Group declaration of flink dependencies on parent 
pom
 new af8dff4  [maven-release-plugin] prepare for next development iteration
 new aca6e7b  [BAHIR-305] Kudu Flink SQL Support 
DynamicSource/Sink&LookupFunction
 new bef51ab  [BAHIR-312] Add license header to README.md files
 new 2e75a60  [BAHIR-321] Make KuduFilterInfo handle String literals
 new 5a1da31  [BAHIR-308] Remove support for scala 2.11
 new 6ff53e7  [BAHIR-308] Bump flink version to 1.15.3
 new 78c76c5  [BAHIR-308] Remove scala prefix where we can
 new d8c803a  [BAHIR-324] Closing KuduReader at JobManager
 new 0335544  [FLINK-34930] Adapt POM files to the new structure, remove 
flink-shaded Guava usage
 new abf6eba  [FLINK-34930] Apply spotless code format and checkstyle rules
 new 34cb67f  [FLINK-34930] Rename base package
 new abc4a3b  [FLINK-34930] Fix KuduTableSourceITCase
 new 9253b9d  [FLINK-34930] State Bahir fork
 new c7dfb67  [FLINK-34930] Skip spotless for JDK 21+
 new 5230ec1  [FLINK-34930] Enable module opens for tests for newer JDKs
 new 8674446  [FLINK-34930] Migrate Bahir NOTICE

The 44 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/push_pr.yml  |   7 -
 .gitmodules|   4 +
 .idea/vcs.xml  |  25 +
 NOTICE |   6 +-
 README.md  |   6 +-
 flink-connector-kudu/README.md | 324 
 flink-connector-kudu/pom.xml   |  97 
 .../kudu/connector/ColumnSchemasFactory.java   |  43 ++
 .../kudu/connector/CreateTableOptionsFactory.java  |  42 ++
 .../connector/kudu/connector/KuduFilterInfo.java   | 199 
 .../connector/kudu/connector/KuduTableInfo.java| 127 +
 .../connector/converter/RowResultConverter.java|  41 ++
 .../connector/converter/RowResultRowConverter.java |  46 ++
 .../converter/RowResultRowDataConverter.java   | 104 
 .../failure/DefaultKuduFailureHandler.java |  38 ++
 .../kudu/connector/failure/KuduFailureHandler.java |  54 ++
 .../kudu/connector/reader/KuduInputSplit.java  |  46 ++
 .../kudu/connector/reader/KuduReader.java  | 198 
 .../kudu/connector/reader/KuduReaderConfig.java|  89 
 .../kudu/connector/reader/KuduReaderIterator.java  |  68 +++
 .../writer/AbstractSingleOperationMapper.java  | 108 
 .../kudu/connector/writer/KuduOperationMapper.java |  46 ++
 .../kudu/connector/writer/KuduWriter.java  | 168 ++
 .../kudu/connector/writer/KuduWriterConfig.java| 206 
 .../kudu/connector/writer/PojoOperationMapper.java |  86 
 .../writer/RowDa

(flink-connector-kudu) 29/44: [maven-release-plugin] prepare for next development iteration

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit af8dff48d082bf8f340e181a8c5719993de47eef
Author: Joao Boto 
AuthorDate: Thu Jun 16 15:44:53 2022 +0200

[maven-release-plugin] prepare for next development iteration
---
 flink-connector-kudu/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 6f68522..fd485d5 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.bahir
 bahir-flink-parent_2.11
-1.1-SNAPSHOT
+1.2-SNAPSHOT
 ../pom.xml
   
 



(flink-connector-kudu) 09/44: [BAHIR-200] Move tests from docker to kudu-test-utils (#49)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit a1b4da53f4a73eaa22e5dbb4d22e0ed897ec4f0a
Author: Joao Boto 
AuthorDate: Sun May 26 02:45:25 2019 +0200

[BAHIR-200] Move tests from docker to kudu-test-utils (#49)
---
 flink-connector-kudu/dockers/docker-compose.yml| 92 --
 flink-connector-kudu/dockers/role/Dockerfile   | 41 --
 .../dockers/role/docker-entrypoint.sh  | 69 
 flink-connector-kudu/dockers/run_kudu_tests.sh | 68 
 flink-connector-kudu/dockers/start-images.sh   | 42 --
 flink-connector-kudu/dockers/stop-images.sh| 33 
 flink-connector-kudu/pom.xml   | 42 +++---
 .../connectors/kudu/connector/KuduFilterInfo.java  |  4 +-
 .../streaming/connectors/kudu/DockerTest.java  | 31 
 .../connectors/kudu/KuduInputFormatTest.java   |  8 +-
 .../connectors/kudu/KuduOuputFormatTest.java   | 13 +--
 .../streaming/connectors/kudu/KuduSinkTest.java| 29 +--
 .../connectors/kudu/connector/KuduDatabase.java| 16 +++-
 13 files changed, 82 insertions(+), 406 deletions(-)

diff --git a/flink-connector-kudu/dockers/docker-compose.yml 
b/flink-connector-kudu/dockers/docker-compose.yml
deleted file mode 100644
index d2c95bb..000
--- a/flink-connector-kudu/dockers/docker-compose.yml
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-version: '2'
-
-services:
-
-  kudu-master:
-image: eskabetxe/kudu
-container_name: kudu-master
-hostname: 172.25.0.6
-ports:
-  - "8051:8051"
-volumes:
-  - /var/lib/kudu/master
-command: master
-networks:
-  mynet:
-ipv4_address: 172.25.0.6
-
-  kudu-server1:
-image: eskabetxe/kudu
-container_name: kudu-server1
-hostname: 172.25.0.7
-environment:
-  - KUDU_MASTER=172.25.0.6
-ports:
-  - "8054:8050"
-volumes:
-  - /var/lib/kudu/server
-command: tserver
-networks:
-  mynet:
-ipv4_address: 172.25.0.7
-links:
-  - kudu-master
-
-  kudu-server2:
-image: eskabetxe/kudu
-container_name: kudu-server2
-hostname: 172.25.0.8
-environment:
-  - KUDU_MASTER=172.25.0.6
-ports:
-  - "8052:8050"
-volumes:
-  - /var/lib/kudu/server
-command: tserver
-networks:
-  mynet:
-ipv4_address: 172.25.0.8
-links:
-  - kudu-master
-
-  kudu-server3:
-image: eskabetxe/kudu
-container_name: kudu-server3
-hostname: 172.25.0.9
-environment:
-  - KUDU_MASTER=172.25.0.6
-ports:
-  - "8053:8050"
-volumes:
-  - /var/lib/kudu/server
-command: tserver
-networks:
-  mynet:
-ipv4_address: 172.25.0.9
-links:
-  - kudu-master
-
-networks:
-  mynet:
-driver: bridge
-ipam:
-  config:
-- subnet: 172.25.0.0/24
-  IPRange: 172.25.0.2/24,
-  gateway: 172.25.0.1
diff --git a/flink-connector-kudu/dockers/role/Dockerfile 
b/flink-connector-kudu/dockers/role/Dockerfile
deleted file mode 100644
index b14b087..000
--- a/flink-connector-kudu/dockers/role/Dockerfile
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-FROM bitnami/minideb:jessie
-MAINTAINER eskabetxe
-
-RUN set -x \
-&& apt-get update \
-&& apt-get

(flink-connector-kudu) 08/44: [BAHIR-202] Improve KuduSink throughput using async FlushMode

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit f9c75bbc99f190fd562215ee1dfef53a36241cda
Author: SuXingLee <913910...@qq.com>
AuthorDate: Fri Mar 22 20:08:33 2019 +0800

[BAHIR-202] Improve KuduSink throughput using async FlushMode

By default, KuduSink processing message one by one
without checkpoint. When checkoint is enabled, throughput
is improved by using FlushMode.AUTO_FLUSH_BACKGROUND,
and use checkpoint to ensure at-least-once.

Closes #50
---
 .../connectors/kudu/KuduOutputFormat.java  |  5 +-
 .../flink/streaming/connectors/kudu/KuduSink.java  | 59 --
 .../connectors/kudu/connector/KuduConnector.java   | 18 +--
 3 files changed, 73 insertions(+), 9 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
index 9d12710..c1301da 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,8 @@ import java.io.IOException;
 
 public class KuduOutputFormat extends RichOutputFormat {
 
+private static final long serialVersionUID = 1L;
+
 private static final Logger LOG = 
LoggerFactory.getLogger(KuduOutputFormat.class);
 
 private String kuduMasters;
@@ -87,7 +90,7 @@ public class KuduOutputFormat extends 
RichOutputFormat {
 @Override
 public void open(int taskNumber, int numTasks) throws IOException {
 if (connector != null) return;
-connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode);
+connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode,FlushMode.AUTO_FLUSH_SYNC);
 serializer = serializer.withSchema(tableInfo.getSchema());
 }
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
index 53cf249..b6dd9c8 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -17,18 +17,25 @@
 package org.apache.flink.streaming.connectors.kudu;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class KuduSink extends RichSinkFunction {
+public class KuduSink extends RichSinkFunction implements 
CheckpointedFunction {
+
+private static final long serialVersionUID = 1L;
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KuduOutputFormat.class);
 
@@ -36,6 +43,7 @@ public class KuduSink extends RichSinkFunction {
 private KuduTableInfo tableInfo;
 private KuduConnector.Consistency consistency;
 private KuduConnector.WriteMode writeMode;
+private FlushMode flushMode;
 
 private KuduSerialization serializer;
 
@@ -77,11 +85,42 @@ public class KuduSink extends RichSinkFunction {
 return this;
 }
 
+public KuduSink withSyncFlushMode() {
+this.flushMode = FlushMode.AUTO_FLUSH_SYNC;
+return this;
+}
+
+public KuduSink withAsyncFlushMode() {
+this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+return this;
+}
+
 @Override
 public void open(Configuration parameters) throws IOException {
-if (connector != null) return;

(flink-connector-kudu) 23/44: [BAHIR-291] Bump flink to 1.14.0 (#136)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit b9cd23e729e0d874b7fd53e86a7890f73b939567
Author: Roc Marshal 
AuthorDate: Mon Dec 27 01:27:08 2021 +0800

[BAHIR-291] Bump flink to 1.14.0 (#136)
---
 flink-connector-kudu/pom.xml   | 2 +-
 .../java/org/apache/flink/connectors/kudu/table/KuduCatalog.java   | 4 ++--
 .../org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java | 7 ++-
 .../org/apache/flink/connectors/kudu/table/KuduTableFactory.java   | 2 +-
 .../org/apache/flink/connectors/kudu/table/KuduTableSource.java| 2 +-
 .../apache/flink/connectors/kudu/table/utils/KuduTableUtils.java   | 2 +-
 6 files changed, 8 insertions(+), 11 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index a76102e..ac6cdc5 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -51,7 +51,7 @@
 
 
   org.apache.flink
-  
flink-table-planner-blink_${scala.binary.version}
+  flink-table-planner_${scala.binary.version}
   ${flink.version}
   provided
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 2ca7c0e..d8343e8 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.AlterTableOptions;
@@ -237,7 +237,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
 
 @Override
 public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists) throws TableAlreadyExistException {
-Map tableProperties = table.getProperties();
+Map tableProperties = table.getOptions();
 TableSchema tableSchema = table.getSchema();
 
 Set optionalProperties = new 
HashSet<>(Arrays.asList(KUDU_REPLICAS));
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
index 30aaa40..2458a56 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -31,9 +31,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-
 /**
  * Factory for {@link KuduCatalog}.
  */
@@ -45,8 +42,8 @@ public class KuduCatalogFactory implements CatalogFactory {
 @Override
 public Map requiredContext() {
 Map context = new HashMap<>();
-context.put(CATALOG_TYPE, KuduTableFactory.KUDU);
-context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+context.put("type", KuduTableFactory.KUDU);
+context.put("property-version", "1"); // backwards compatibility
 return context;
 }
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 524f521..a2883af 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -132,7 +132,7 @@ public class KuduTableFactory implements 
TableSourceFactory, TableSinkFacto
 public KuduTableSource createTableSource(ObjectPath tablePath, 
CatalogTable table) {
 validateTable(table);
 String tableName = table.toProperties().getOrDefault(KUDU_TABLE, 
tablePath.getObjectName());
-return createTableSource(tableName, table.getSchema(), 
table.getProperties());
+return createTableSource(tableName, table.getSchema(), 
table.getOptions());
 }
 
 private KuduTableSource createTableSource(String tableName, TableSchema 
schema, Map props) {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
 
b/flink-connector-kudu/src/main

(flink-connector-kudu) 24/44: [BAHIR-296] Unify log4j libs

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 7700edb6a18d08cf2f93f84d4377807af1c826bd
Author: Joao Boto 
AuthorDate: Sun Dec 26 19:39:56 2021 +0100

[BAHIR-296] Unify log4j libs
---
 flink-connector-kudu/pom.xml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index ac6cdc5..12fd9da 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -104,19 +104,19 @@
 
   org.apache.logging.log4j
   log4j-api
-  ${log4j2.version}
+  ${log4j.version}
test
 
 
   org.apache.logging.log4j
   log4j-core
-  ${log4j2.version}
+  ${log4j.version}
test
 
 
   org.apache.logging.log4j
   log4j-slf4j-impl
-  ${log4j2.version}
+  ${log4j.version}
   test
 
 



(flink-connector-kudu) 14/44: Fix "the client has already been closed" bug (#75)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 9b99c1862cad3c26b0d287c1fb13e15b588b1392
Author: WangJ1an <826204...@qq.com>
AuthorDate: Mon Jan 20 04:20:34 2020 +0800

Fix "the client has already been closed" bug (#75)

Fix the problem "Caused by: java.lang.IllegalStateException:
Cannot proceed, the client has already been closed" when
running in Flink local mode
---
 .../java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
index 3a35e6a..98877d8 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
@@ -98,6 +98,7 @@ public class KuduInputFormat extends 
RichInputFormat {
 }
 if (kuduReader != null) {
 kuduReader.close();
+kuduReader = null;
 }
 }
 



(flink-connector-kudu) 15/44: Fix NotSerializableException in Kudu connector (#74)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 6da6ad7ad427b23392d3bcbbd43391c05846ad5e
Author: Darcy <331046...@qq.com>
AuthorDate: Mon Jan 20 04:22:46 2020 +0800

Fix NotSerializableException in Kudu connector (#74)
---
 .../org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index c37bc9a..c7ae4a4 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -21,10 +21,11 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduPredicate;
 
+import java.io.Serializable;
 import java.util.List;
 
 @PublicEvolving
-public class KuduFilterInfo {
+public class KuduFilterInfo implements Serializable {
 
 private String column;
 private FilterType type;



(flink-connector-kudu) 04/44: [BAHIR-179] Fail Docker integration tests silently

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 701fabeeb897f13cfcde2e55c8a035c1589e74fc
Author: eskabetxe 
AuthorDate: Wed Jan 9 17:29:32 2019 +0100

[BAHIR-179] Fail Docker integration tests silently

When running docker based integration tests locally,
fail silentily if env requirements not available.

Closes #38
Closes #35
---
 flink-connector-kudu/pom.xml   | 42 --
 .../streaming/connectors/kudu/DockerTest.java  | 31 
 .../connectors/kudu/KuduInputFormatTest.java   |  4 +--
 .../connectors/kudu/KuduOuputFormatTest.java   |  3 +-
 .../streaming/connectors/kudu/KuduSinkTest.java|  2 +-
 .../src/test/resources/log4j.properties| 27 ++
 6 files changed, 67 insertions(+), 42 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 348371b..61ab4a6 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -31,7 +31,8 @@
 
   
 1.7.1
-5.2.0
+
+!DockerTest
   
 
   
@@ -58,45 +59,14 @@
   test
 
 
-
-  org.junit.jupiter
-  junit-jupiter-api
-  ${junit.version}
-  test
-
-
   
 
   
 
-  default
-  
-true
-  
-  
-
-  
-org.apache.maven.plugins
-maven-surefire-plugin
-
-  
-**/*Test.java
-  
-
-  
-
-  
-
-
-  test-kudu
-  
-
-  
-org.apache.maven.plugins
-maven-surefire-plugin
-  
-
-  
+  docker-test
+  
+DockerTest
+  
 
   
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java
new file mode 100644
index 000..070e634
--- /dev/null
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.junit.jupiter.api.Tag;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Tag("DockerTest")
+public @interface DockerTest {
+}
+
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
index 8cfc102..eb9dc00 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
@@ -26,11 +26,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+@DockerTest
 public class KuduInputFormatTest extends KuduDatabase {
 
-
-
-
 @Test
 public void testInvalidKuduMaster() throws IOException {
 KuduTableInfo tableInfo = booksTableInfo("books",false);
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
index 6eb5ebe..e282185 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -26,10 +26,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
+@DockerTest
 public class KuduOuputFormatTest extends KuduDatabase {
 
-
-
 @Test
 public void testInvalidKuduMaster() throws IOException {
 Ku

(flink-connector-kudu) 22/44: [BAHIR-293] Fix documentation tables

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 03f031288d7f97338e55a8534e59bc473bcfdea1
Author: Joao Boto 
AuthorDate: Tue Dec 7 11:09:26 2021 +0100

[BAHIR-293] Fix documentation tables
---
 flink-connector-kudu/README.md | 25 +
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 6370aa6..a0e0234 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -154,18 +154,19 @@ The example uses lambda expressions to implement the 
functional interfaces.
 Read more about Kudu schema design in the [Kudu 
docs](https://kudu.apache.org/docs/schema_design.html).
 
 ### Supported data types
-| Flink/SQL | Kudu   | 
-| - |:-:| 
-|STRING | STRING| 
-| BOOLEAN   |BOOL   | 
-| TINYINT   |   INT8| 
-| SMALLINT  |  INT16| 
-| INT   |  INT32| 
-| BIGINT|   INT64 |
-| FLOAT |  FLOAT  |
-| DOUBLE|DOUBLE|
-| BYTES|BINARY|
-| TIMESTAMP(3) |UNIXTIME_MICROS |
+
+| Flink/SQL| Kudu|
+|--|:---:|
+| `STRING` | STRING  |
+| `BOOLEAN`| BOOL|
+| `TINYINT`| INT8|
+| `SMALLINT`   | INT16   |
+| `INT`| INT32   |
+| `BIGINT` | INT64   |
+| `FLOAT`  | FLOAT   |
+| `DOUBLE` | DOUBLE  |
+| `BYTES`  | BINARY  |
+| `TIMESTAMP(3)`   | UNIXTIME_MICROS |
 
 Note:
 * `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java 
conversion class is `java.sql.Timestamp` 



(flink-connector-kudu) 02/44: [FLINK-34930] Remove Python CI

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 663ffd1a45b9e675058d708ef4f45546736a985f
Author: Ferenc Csaky 
AuthorDate: Thu Apr 11 20:53:49 2024 +0200

[FLINK-34930] Remove Python CI
---
 .github/workflows/push_pr.yml | 7 ---
 1 file changed, 7 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 20a..72b98a2 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -36,10 +36,3 @@ jobs:
 with:
   flink_version: ${{ matrix.flink }}
   jdk_version: ${{ matrix.jdk }}
-  python_test:
-strategy:
-  matrix:
-flink: [ 1.17.2, 1.18.1, 1.19-SNAPSHOT ]
-uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
-with:
-  flink_version: ${{ matrix.flink }}
\ No newline at end of file



(flink-connector-kudu) 07/44: [BAHIR-199] Bump kudu version to 1.9.0

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 1e26505253818da3a5176281f7c5264978e1c71e
Author: Joao Boto 
AuthorDate: Thu Mar 7 21:51:06 2019 +0100

[BAHIR-199] Bump kudu version to 1.9.0

Closes #48
---
 flink-connector-kudu/README.md | 2 +-
 flink-connector-kudu/pom.xml   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index af2985b..9b75aa7 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -9,7 +9,7 @@ following dependency to your project:
   1.1-SNAPSHOT
 
 
-*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* 
(last stable version).
+*Version Compatibility*: This module is compatible with Apache Kudu *1.9.0* 
(last stable version).
 
 Note that the streaming connectors are not part of the binary distribution of 
Flink. You need to link them into your job jar for cluster execution.
 See how to link with them for cluster execution 
[here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 61ab4a6..d51341b 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   jar
 
   
-1.7.1
+1.9.0
 
 !DockerTest
   



(flink-connector-kudu) 13/44: Add support "upsert part of columns of a kudu table" (#70)

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit c380e400d4c81c83dabf7ff42f4a5bb193492704
Author: Darcy <331046...@qq.com>
AuthorDate: Sat Nov 30 04:02:56 2019 +0800

Add support "upsert part of columns of a kudu table" (#70)

Sometimes we don't want to upsert all columns of a kudu table.
So we need to support the function that upsert part of columns
of a kudu table.
---
 .../flink/connectors/kudu/connector/KuduRow.java   |  4 
 .../kudu/connector/writer/KuduWriter.java  |  5 -
 .../connectors/kudu/batch/KuduOuputFormatTest.java |  2 ++
 .../connectors/kudu/connector/KuduDatabase.java| 26 --
 .../connectors/kudu/streaming/KuduSinkTest.java|  3 ++-
 5 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
index af78361..78e6e6e 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
@@ -41,6 +41,10 @@ public class KuduRow extends Row {
 return super.getField(rowNames.get(name));
 }
 
+public boolean hasField(String name) {
+return rowNames.get(name) != null;
+}
+
 public void setField(int pos, String name, Object value) {
 super.setField(pos, value);
 this.rowNames.put(name, pos);
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index f4e2a8a..57c0741 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -152,7 +152,10 @@ public class KuduWriter implements AutoCloseable {
 
 table.getSchema().getColumns().forEach(column -> {
 String columnName = column.getName();
-Object value = row.getField(column.getName());
+if (!row.hasField(columnName)) {
+return;
+}
+Object value = row.getField(columnName);
 
 if (value == null) {
 partialRow.setNull(columnName);
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index 963a8c0..f14eaa0 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -71,6 +71,7 @@ class KuduOuputFormatTest extends KuduDatabase {
 
 List rows = readRows(tableInfo);
 Assertions.assertEquals(5, rows.size());
+kuduRowsTest(rows);
 
 cleanDatabase(tableInfo);
 }
@@ -99,6 +100,7 @@ class KuduOuputFormatTest extends KuduDatabase {
 
 List rows = readRows(tableInfo);
 Assertions.assertEquals(5, rows.size());
+kuduRowsTest(rows);
 
 cleanDatabase(tableInfo);
 }
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
index 3d02a1d..cda8c21 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
@@ -56,14 +56,16 @@ public class KuduDatabase {
 .addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
 .addColumn(KuduColumnInfo.Builder.create("title", 
Type.STRING).build())
 .addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
-.addColumn(KuduColumnInfo.Builder.create("price", 
Type.DOUBLE).build())
-.addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).build())
+.addColumn(KuduColumnInfo.Builder.create("price", 
Type.DOUBLE).asNullable().build())
+.addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).asNullable().build())
 .build();
 }
 
 protected static List booksDataRow() {
 return Arrays.stream(booksTableData)
 .map(row -> {
+Integer rowId = (Integer)row[0];
+if (rowId % 2 == 1) {
 

(flink-connector-kudu) 03/44: [BAHIR-99] kudu connector

2024-05-14 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 37a3e55b5253027c5f475ef5e3c7c8faa42ffebe
Author: Joao Boto 
AuthorDate: Wed Jul 25 20:17:36 2018 +0200

[BAHIR-99] kudu connector
---
 flink-connector-kudu/README.md |  98 +
 flink-connector-kudu/dockers/docker-compose.yml|  92 +
 flink-connector-kudu/dockers/role/Dockerfile   |  41 
 .../dockers/role/docker-entrypoint.sh  |  69 +++
 flink-connector-kudu/dockers/run_kudu_tests.sh |  68 +++
 flink-connector-kudu/dockers/start-images.sh   |  42 
 flink-connector-kudu/dockers/stop-images.sh|  33 
 flink-connector-kudu/pom.xml   | 103 ++
 .../streaming/connectors/kudu/KuduInputFormat.java | 218 +
 .../connectors/kudu/KuduOutputFormat.java  | 110 +++
 .../flink/streaming/connectors/kudu/KuduSink.java  | 106 ++
 .../connectors/kudu/connector/KuduColumnInfo.java  | 161 +++
 .../connectors/kudu/connector/KuduConnector.java   | 133 +
 .../connectors/kudu/connector/KuduFilterInfo.java  | 173 
 .../connectors/kudu/connector/KuduMapper.java  | 146 ++
 .../connectors/kudu/connector/KuduRow.java | 137 +
 .../connectors/kudu/connector/KuduTableInfo.java   | 133 +
 .../connectors/kudu/KuduInputFormatTest.java   |  91 +
 .../connectors/kudu/KuduOuputFormatTest.java   |  93 +
 .../streaming/connectors/kudu/KuduSinkTest.java|  89 +
 .../connectors/kudu/connector/KuduDatabase.java|  89 +
 21 files changed, 2225 insertions(+)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
new file mode 100644
index 000..af2985b
--- /dev/null
+++ b/flink-connector-kudu/README.md
@@ -0,0 +1,98 @@
+# Flink Kudu Connector
+
+This connector provides a source (```KuduInputFormat```) and a sink/output 
(```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and 
write to [Kudu](https://kudu.apache.org/). To use this connector, add the
+following dependency to your project:
+
+
+  org.apache.bahir
+  flink-connector-kudu_2.11
+  1.1-SNAPSHOT
+
+
+*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* 
(last stable version).
+
+Note that the streaming connectors are not part of the binary distribution of 
Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution 
[here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+
+## Installing Kudu
+
+Follow the instructions from the [Kudu Installation 
Guide](https://kudu.apache.org/docs/installation.html).
+Optionally, you can use the docker images provided in dockers folder. 
+
+## KuduInputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+.create("books")
+.addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
+.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+.addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
+.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+.addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).build())
+.build();
+
+// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
+env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+.count();
+
+env.execute();
+```
+
+## KuduOutputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+.create("books")
+.createIfNotExist(true)
+.replicas(1)
+.addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
+.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+.addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
+.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+.addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).build())
+.build();
+
+...
+
+env.fromCollection(books)
+.output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+
+env.execute();
+```
+
+## KuduSink
+
+```
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Bui