(flink) branch release-1.18 updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 1e1a7f16b6f [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() 1e1a7f16b6f is described below commit 1e1a7f16b6f272334d9f9a1053b657148151a789 Author: Sergey Nuyanzin AuthorDate: Wed May 15 07:37:47 2024 +0200 [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() Co-authored-by: Sergey Nuyanzin This closes apache#19797 Co-authored-by: chenzihao --- .../aggfunctions/RankLikeAggFunctionBase.java | 2 +- .../planner/plan/utils/AggFunctionFactory.scala| 17 +++--- .../plan/batch/sql/agg/OverAggregateTest.xml | 44 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 + .../runtime/stream/sql/OverAggregateITCase.scala | 60 ++ 5 files changed, 129 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java index 2a556d7b741..898939aedb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java @@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti equalTo(lasValue, operand(i))); } Optional ret = Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and); -return ret.orElseGet(() -> literal(true)); +return ret.orElseGet(() -> literal(false)); } protected Expression generateInitLiteral(LogicalType orderType) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index cbabddae03a..0f1005e51f0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -529,18 +529,23 @@ class AggFunctionFactory( } private def createRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new RankAggFunction(argTypes) +new RankAggFunction(getArgTypesOrEmpty()) } private def createDenseRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new DenseRankAggFunction(argTypes) +new DenseRankAggFunction(getArgTypesOrEmpty()) } private def createPercentRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new PercentRankAggFunction(argTypes) +new PercentRankAggFunction(getArgTypesOrEmpty()) + } + + private def getArgTypesOrEmpty(): Array[LogicalType] = { +if (orderKeyIndexes != null) { + orderKeyIndexes.map(inputRowType.getChildren.get(_)) +} else { + Array[LogicalType]() +} } private def createNTILEAggFUnction(argTypes: Array[LogicalType]): UserDefinedFunction = { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml index 909efe170f7..0ca5ec28442 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml @@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDE ]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/O
(flink) branch release-1.19 updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new 190522c2c05 [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() 190522c2c05 is described below commit 190522c2c051e0ec05213be71fb7a59a517353b1 Author: Sergey Nuyanzin AuthorDate: Wed May 15 07:36:10 2024 +0200 [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() Co-authored-by: Sergey Nuyanzin This closes apache#19797 Co-authored-by: chenzihao --- .../aggfunctions/RankLikeAggFunctionBase.java | 2 +- .../planner/plan/utils/AggFunctionFactory.scala| 17 +++--- .../plan/batch/sql/agg/OverAggregateTest.xml | 44 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 + .../runtime/stream/sql/OverAggregateITCase.scala | 60 ++ 5 files changed, 129 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java index 2a556d7b741..898939aedb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java @@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti equalTo(lasValue, operand(i))); } Optional ret = Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and); -return ret.orElseGet(() -> literal(true)); +return ret.orElseGet(() -> literal(false)); } protected Expression generateInitLiteral(LogicalType orderType) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 861f537f6c2..fd44945dccf 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -529,18 +529,23 @@ class AggFunctionFactory( } private def createRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new RankAggFunction(argTypes) +new RankAggFunction(getArgTypesOrEmpty()) } private def createDenseRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new DenseRankAggFunction(argTypes) +new DenseRankAggFunction(getArgTypesOrEmpty()) } private def createPercentRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new PercentRankAggFunction(argTypes) +new PercentRankAggFunction(getArgTypesOrEmpty()) + } + + private def getArgTypesOrEmpty(): Array[LogicalType] = { +if (orderKeyIndexes != null) { + orderKeyIndexes.map(inputRowType.getChildren.get(_)) +} else { + Array[LogicalType]() +} } private def createNTILEAggFUnction(argTypes: Array[LogicalType]): UserDefinedFunction = { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml index 909efe170f7..0ca5ec28442 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml @@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDE ]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/O
(flink) branch master updated: [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 40fb49dd17b [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() 40fb49dd17b is described below commit 40fb49dd17b3e1b6c5aa0249514273730ebe9226 Author: chenzihao AuthorDate: Tue May 14 22:18:05 2024 +0200 [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank() Co-authored-by: Sergey Nuyanzin This closes apache#19797 --- .../aggfunctions/RankLikeAggFunctionBase.java | 2 +- .../planner/plan/utils/AggFunctionFactory.scala| 17 +++--- .../plan/batch/sql/agg/OverAggregateTest.xml | 44 .../plan/batch/sql/agg/OverAggregateTest.scala | 13 + .../runtime/stream/sql/OverAggregateITCase.scala | 60 ++ 5 files changed, 129 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java index 2a556d7b741..898939aedb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java @@ -99,7 +99,7 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti equalTo(lasValue, operand(i))); } Optional ret = Arrays.stream(orderKeyEquals).reduce(ExpressionBuilder::and); -return ret.orElseGet(() -> literal(true)); +return ret.orElseGet(() -> literal(false)); } protected Expression generateInitLiteral(LogicalType orderType) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 4ecd4363863..6ca84314fc7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -532,18 +532,23 @@ class AggFunctionFactory( } private def createRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new RankAggFunction(argTypes) +new RankAggFunction(getArgTypesOrEmpty()) } private def createDenseRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new DenseRankAggFunction(argTypes) +new DenseRankAggFunction(getArgTypesOrEmpty()) } private def createPercentRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = { -val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_)) -new PercentRankAggFunction(argTypes) +new PercentRankAggFunction(getArgTypesOrEmpty()) + } + + private def getArgTypesOrEmpty(): Array[LogicalType] = { +if (orderKeyIndexes != null) { + orderKeyIndexes.map(inputRowType.getChildren.get(_)) +} else { + Array[LogicalType]() +} } private def createNTILEAggFUnction(argTypes: Array[LogicalType]): UserDefinedFunction = { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml index 909efe170f7..0ca5ec28442 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml @@ -280,6 +280,50 @@ OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDE ]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala b/flink-table/flink-table-pla
(flink) 01/04: [FLINK-35193][table] Support drop materialized table syntax
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 8551ef39e0387f723a72299cc73aaaf827cf74bf Author: Feng Jin AuthorDate: Mon May 13 20:06:41 2024 +0800 [FLINK-35193][table] Support drop materialized table syntax --- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 30 ++ .../sql/parser/ddl/SqlDropMaterializedTable.java | 68 ++ .../flink/sql/parser/utils/ParserResource.java | 3 + .../MaterializedTableStatementParserTest.java | 25 5 files changed, 127 insertions(+) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 81b3412954c..883b6aec1b2 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -76,6 +76,7 @@ "org.apache.flink.sql.parser.ddl.SqlDropCatalog" "org.apache.flink.sql.parser.ddl.SqlDropDatabase" "org.apache.flink.sql.parser.ddl.SqlDropFunction" +"org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable" "org.apache.flink.sql.parser.ddl.SqlDropPartitions" "org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext" "org.apache.flink.sql.parser.ddl.SqlDropTable" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index bdc97818914..b2a5ea02d0f 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1801,6 +1801,34 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar } } +/** +* Parses a DROP MATERIALIZED TABLE statement. +*/ +SqlDrop SqlDropMaterializedTable(Span s, boolean replace, boolean isTemporary) : +{ +SqlIdentifier tableName = null; +boolean ifExists = false; +} +{ + + { + if (isTemporary) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.dropTemporaryMaterializedTableUnsupported()); + } + } + + +ifExists = IfExistsOpt() + +tableName = CompoundIdentifier() + +{ +return new SqlDropMaterializedTable(s.pos(), tableName, ifExists); +} +} + /** * Parses alter materialized table. */ @@ -2427,6 +2455,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) : ( drop = SqlDropCatalog(s, replace) | +drop = SqlDropMaterializedTable(s, replace, isTemporary) +| drop = SqlDropTable(s, replace, isTemporary) | drop = SqlDropView(s, replace, isTemporary) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java new file mode 100644 index 000..ec9439fb13a --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlDrop; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** DROP MATERIALIZED TABLE DDL sql call. */ +public class SqlDropMaterializedTable extends SqlDrop { + +private static final SqlOperator OPERATOR = +new SqlSpecialOperator("DROP MATERIALIZED TABLE", SqlKind.DROP_TABLE); + +private final SqlIdentifier tableIdentifier; + +public SqlDropMaterializedTable( +
(flink) 03/04: [FLINK-35193][table] Support execution of drop materialized table
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 51b744bca1bdf53385152ed237f2950525046488 Author: Feng Jin AuthorDate: Mon May 13 20:08:38 2024 +0800 [FLINK-35193][table] Support execution of drop materialized table --- .../MaterializedTableManager.java | 115 +- .../service/operation/OperationExecutor.java | 9 + .../service/MaterializedTableStatementITCase.java | 241 ++--- .../apache/flink/table/catalog/CatalogManager.java | 4 +- 4 files changed, 328 insertions(+), 41 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index b4ba12b8755..a51b1885c98 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.materializedtable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; @@ -34,6 +35,7 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.service.operation.OperationExecutor; import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.operations.command.DescribeJobOperation; import org.apache.flink.table.operations.command.StopJobOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; @@ -93,6 +95,9 @@ public class MaterializedTableManager { } else if (op instanceof AlterMaterializedTableResumeOperation) { return callAlterMaterializedTableResume( operationExecutor, handle, (AlterMaterializedTableResumeOperation) op); +} else if (op instanceof DropMaterializedTableOperation) { +return callDropMaterializedTableOperation( +operationExecutor, handle, (DropMaterializedTableOperation) op); } throw new SqlExecutionException( @@ -146,8 +151,7 @@ public class MaterializedTableManager { materializedTableIdentifier, e); operationExecutor.callExecutableOperation( -handle, -new DropMaterializedTableOperation(materializedTableIdentifier, true, false)); +handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); throw e; } } @@ -170,7 +174,8 @@ public class MaterializedTableManager { materializedTable.getSerializedRefreshHandler(), operationExecutor.getSessionContext().getUserClassloader()); -String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler); +String savepointPath = +stopJobWithSavepoint(operationExecutor, handle, refreshHandler.getJobId()); ContinuousRefreshHandler updateRefreshHandler = new ContinuousRefreshHandler( @@ -183,9 +188,12 @@ public class MaterializedTableManager { CatalogMaterializedTable.RefreshStatus.SUSPENDED, materializedTable.getRefreshHandlerDescription().orElse(null), serializeContinuousHandler(updateRefreshHandler)); +List tableChanges = new ArrayList<>(); +tableChanges.add( + TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED)); AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = new AlterMaterializedTableChangeOperation( -tableIdentifier, Collections.emptyList(), updatedMaterializedTable); +tableIdentifier, tableChanges, updatedMaterializedTable); operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); @@ -284,8 +292,7 @@ public class MaterializedTableManager { // drop materialized table while submit flink streaming job occur exception. Thus, weak // atomicity is guaranteed
(flink) 04/04: [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check for wrong status
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 94d861b08fef1e350d80a3f5f0f63168d327bc64 Author: Feng Jin AuthorDate: Tue May 14 11:18:40 2024 +0800 [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check for wrong status --- .../service/MaterializedTableStatementITCase.java| 20 +++- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 105c51ea597..dd7d25e124c 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -272,7 +272,7 @@ public class MaterializedTableStatementITCase { waitUntilAllTasksAreRunning( restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); -// check the background job is running +// verify the background job is running String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); OperationHandle describeJobHandle = service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); @@ -653,7 +653,7 @@ public class MaterializedTableStatementITCase { assertThat(suspendMaterializedTable.getRefreshStatus()) .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED); -// check background job is stopped +// verify background job is stopped byte[] refreshHandler = suspendMaterializedTable.getSerializedRefreshHandler(); ContinuousRefreshHandler suspendRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( @@ -667,7 +667,7 @@ public class MaterializedTableStatementITCase { List jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED"); -// check savepoint is created +// verify savepoint is created assertThat(suspendRefreshHandler.getRestorePath()).isNotEmpty(); String actualSavepointPath = suspendRefreshHandler.getRestorePath().get(); @@ -692,7 +692,17 @@ public class MaterializedTableStatementITCase { assertThat(resumedCatalogMaterializedTable.getRefreshStatus()) .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); -// check background job is running +waitUntilAllTasksAreRunning( +restClusterClient, +JobID.fromHexString( +ContinuousRefreshHandlerSerializer.INSTANCE +.deserialize( +resumedCatalogMaterializedTable +.getSerializedRefreshHandler(), +getClass().getClassLoader()) +.getJobId())); + +// verify background job is running refreshHandler = resumedCatalogMaterializedTable.getSerializedRefreshHandler(); ContinuousRefreshHandler resumeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( @@ -706,7 +716,7 @@ public class MaterializedTableStatementITCase { jobResults = fetchAllResults(service, sessionHandle, describeResumeJobHandle); assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); -// check resumed job is restored from savepoint +// verify resumed job is restored from savepoint Optional actualRestorePath = getJobRestoreSavepointPath(restClusterClient, resumeJobId); assertThat(actualRestorePath).isNotEmpty();
(flink) 02/04: [FLINK-35193][table] Support convert drop materialized table node to operation
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fd333941553c68c36e1460102ab023f80a5b1362 Author: Feng Jin AuthorDate: Mon May 13 20:07:39 2024 +0800 [FLINK-35193][table] Support convert drop materialized table node to operation --- .../DropMaterializedTableOperation.java| 6 ++-- .../SqlDropMaterializedTableConverter.java | 41 ++ .../operations/converters/SqlNodeConverters.java | 1 + ...erializedTableNodeToOperationConverterTest.java | 21 +++ 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java index e5eee557bfc..46dd86ad96b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java @@ -33,9 +33,8 @@ import java.util.Map; public class DropMaterializedTableOperation extends DropTableOperation implements MaterializedTableOperation { -public DropMaterializedTableOperation( -ObjectIdentifier tableIdentifier, boolean ifExists, boolean isTemporary) { -super(tableIdentifier, ifExists, isTemporary); +public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, boolean ifExists) { +super(tableIdentifier, ifExists, false); } @Override @@ -43,7 +42,6 @@ public class DropMaterializedTableOperation extends DropTableOperation Map params = new LinkedHashMap<>(); params.put("identifier", getTableIdentifier()); params.put("IfExists", isIfExists()); -params.put("isTemporary", isTemporary()); return OperationUtils.formatWithChildren( "DROP MATERIALIZED TABLE", diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java new file mode 100644 index 000..6501dc0c453 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; + +/** A converter for {@link SqlDropMaterializedTable}. */ +public class SqlDropMaterializedTableConverter +implements SqlNodeConverter { +@Override +public Operation convertSqlNode( +SqlDropMaterializedTable sqlDropMaterializedTable, ConvertContext context) { +UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlDropMaterializedTable.fullTableName()); +ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); +// Currently we don't support temporary materialized table, so isTemporary is always false +return new DropMaterializedTableOperation( +identifier, sqlDropMaterializedTable.getIfExists()); +} +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
(flink) branch master updated (65d31e26534 -> 94d861b08fe)
This is an automated email from the ASF dual-hosted git repository. ron pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 65d31e26534 [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. new 8551ef39e03 [FLINK-35193][table] Support drop materialized table syntax new fd333941553 [FLINK-35193][table] Support convert drop materialized table node to operation new 51b744bca1b [FLINK-35193][table] Support execution of drop materialized table new 94d861b08fe [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check for wrong status The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../MaterializedTableManager.java | 115 - .../service/operation/OperationExecutor.java | 9 + .../service/MaterializedTableStatementITCase.java | 261 ++--- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 30 +++ ...pCatalog.java => SqlDropMaterializedTable.java} | 40 ++-- .../flink/sql/parser/utils/ParserResource.java | 3 + .../MaterializedTableStatementParserTest.java | 25 ++ .../apache/flink/table/catalog/CatalogManager.java | 4 +- .../DropMaterializedTableOperation.java| 6 +- ...java => SqlDropMaterializedTableConverter.java} | 20 +- .../operations/converters/SqlNodeConverters.java | 1 + ...erializedTableNodeToOperationConverterTest.java | 21 ++ 13 files changed, 455 insertions(+), 81 deletions(-) copy flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlDropCatalog.java => SqlDropMaterializedTable.java} (68%) copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/{SqlAlterMaterializedTableSuspendConverter.java => SqlDropMaterializedTableConverter.java} (59%)
(flink) branch master updated: [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 65d31e26534 [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. 65d31e26534 is described below commit 65d31e26534836909f6b8139c6bd6cd45b91bba4 Author: JunRuiLee AuthorDate: Fri Feb 2 18:01:03 2024 +0800 [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. --- .../DefaultShuffleMasterSnapshotContext.java | 22 + .../shuffle/EmptyShuffleMasterSnapshot.java| 37 ++ .../flink/runtime/shuffle/NettyShuffleMaster.java | 12 +++ .../flink/runtime/shuffle/ShuffleMaster.java | 19 +++ .../runtime/shuffle/ShuffleMasterSnapshot.java | 31 ++ .../shuffle/ShuffleMasterSnapshotContext.java | 22 + 6 files changed, 143 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java new file mode 100644 index 000..8fcff36eef1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/DefaultShuffleMasterSnapshotContext.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +/** Default {@link ShuffleMasterSnapshotContext} implementation. */ +public class DefaultShuffleMasterSnapshotContext implements ShuffleMasterSnapshotContext {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java new file mode 100644 index 000..683ccd07968 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/EmptyShuffleMasterSnapshot.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +/** + * A singleton implementation of {@link ShuffleMasterSnapshot} that represents an empty snapshot of + * shuffle master. + */ +public class EmptyShuffleMasterSnapshot implements ShuffleMasterSnapshot { + +private static final EmptyShuffleMasterSnapshot INSTANCE = new EmptyShuffleMasterSnapshot(); + +@Override +public boolean isIncremental() { +return false; +} + +public static EmptyShuffleMasterSnapshot getInstance() { +return INSTANCE; +} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index 9cce16cf495..461457a29b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -187,4 +187,16 @@ public class NettyShuffleMaster implements ShuffleMaster public void unregisterJob(JobID jobId) { jobShuffleContexts.remove(jobId); } + +@Override +public boolean supportsBatchSnapshot() { +return true; +} + +@Override +public void snapshotState( +CompletableFuture
(flink) branch master updated: [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9a5a99b1a30 [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762) 9a5a99b1a30 is described below commit 9a5a99b1a30054268bbde36d565cbb1b81018890 Author: Yanfei Lei AuthorDate: Tue May 14 16:21:03 2024 +0800 [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762) --- .../filemerging/FileMergingSnapshotManager.java| 75 ++ .../FileMergingSnapshotManagerBase.java| 49 ++ .../checkpoint/filemerging/LogicalFile.java| 1 + .../checkpoint/filemerging/PhysicalFile.java | 32 +++-- ...ssCheckpointFileMergingSnapshotManagerTest.java | 30 - .../FileMergingSnapshotManagerTestBase.java| 47 ++ ...inCheckpointFileMergingSnapshotManagerTest.java | 30 - ...FileMergingCheckpointStateOutputStreamTest.java | 5 +- 8 files changed, 247 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index add8806369c..f3523c4430f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import java.io.Closeable; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; /** @@ -268,4 +269,78 @@ public interface FileMergingSnapshotManager extends Closeable { "%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism); } } + +/** Space usage statistics of a managed directory. */ +final class SpaceStat { + +AtomicLong physicalFileCount; +AtomicLong physicalFileSize; + +AtomicLong logicalFileCount; +AtomicLong logicalFileSize; + +public SpaceStat() { +this(0, 0, 0, 0); +} + +public SpaceStat( +long physicalFileCount, +long physicalFileSize, +long logicalFileCount, +long logicalFileSize) { +this.physicalFileCount = new AtomicLong(physicalFileCount); +this.physicalFileSize = new AtomicLong(physicalFileSize); +this.logicalFileCount = new AtomicLong(logicalFileCount); +this.logicalFileSize = new AtomicLong(logicalFileSize); +} + +public void onLogicalFileCreate(long size) { +physicalFileSize.addAndGet(size); +logicalFileSize.addAndGet(size); +logicalFileCount.incrementAndGet(); +} + +public void onLogicalFileDelete(long size) { +logicalFileSize.addAndGet(-size); +logicalFileCount.decrementAndGet(); +} + +public void onPhysicalFileCreate() { +physicalFileCount.incrementAndGet(); +} + +public void onPhysicalFileDelete(long size) { +physicalFileSize.addAndGet(-size); +physicalFileCount.decrementAndGet(); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +SpaceStat spaceStat = (SpaceStat) o; +return physicalFileCount.get() == spaceStat.physicalFileCount.get() +&& physicalFileSize.get() == spaceStat.physicalFileSize.get() +&& logicalFileCount.get() == spaceStat.logicalFileCount.get() +&& logicalFileSize.get() == spaceStat.logicalFileSize.get(); +} + +@Override +public String toString() { +return "SpaceStat{" ++ "physicalFileCount=" ++ physicalFileCount.get() ++ ", physicalFileSize=" ++ physicalFileSize.get() ++ ", logicalFileCount=" ++ logicalFileCount.get() ++ ", logicalFileSize=" ++ logicalFileSize.get() ++ '}'; +} +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 1e1abdcaf64..
(flink) branch release-1.18 updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 1f604da2dfc [FLINK-35098][ORC] Fix incorrect results with literal first expressions 1f604da2dfc is described below commit 1f604da2dfc831d04826a20b3cb272d2ad9dfb56 Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com> AuthorDate: Tue May 14 14:10:56 2024 +0700 [FLINK-35098][ORC] Fix incorrect results with literal first expressions --- .../main/java/org/apache/flink/orc/OrcFilters.java | 8 +-- .../apache/flink/orc/OrcFileSystemFilterTest.java | 58 +++--- .../org/apache/flink/orc/OrcFileSystemITCase.java | 11 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java index 4393356fc30..46eb9af0927 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java @@ -83,28 +83,28 @@ public class OrcFilters { convertBinary( call, OrcFilters::convertGreaterThan, - OrcFilters::convertLessThanEquals)) + OrcFilters::convertLessThan)) .put( BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertGreaterThanEquals, - OrcFilters::convertLessThan)) + OrcFilters::convertLessThanEquals)) .put( BuiltInFunctionDefinitions.LESS_THAN, call -> convertBinary( call, OrcFilters::convertLessThan, - OrcFilters::convertGreaterThanEquals)) + OrcFilters::convertGreaterThan)) .put( BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertLessThanEquals, - OrcFilters::convertGreaterThan)) + OrcFilters::convertGreaterThanEquals)) .build(); private static boolean isRef(Expression expression) { diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java index cdee400723d..f5d9ed33423 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java @@ -25,13 +25,13 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG; import static org.assertj.core.api.Assertions.assertThat; /** Unit Tests for {@link OrcFileFormatFactory}. */ @@ -39,7 +39,7 @@ class OrcFileSystemFilterTest { @Test @SuppressWarnings("unchecked") -public void testApplyPredicate() { +void testApplyPredicate() { List args = new ArrayList<>(); // equal @@ -53,8 +53,7 @@ class OrcFileSystemFilterTest { CallExpression.permanent( BuiltInFunctionDefinitions.EQUALS, args, DataTypes.BOOLEAN()); OrcFilters.Predicate predicate1 = OrcFilters.toOrcPredicate(equalExpression); -OrcFilters.Predicate predicate2 = -new OrcFilters.
(flink) branch release-1.19 updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new e16da86dfb1 [FLINK-35098][ORC] Fix incorrect results with literal first expressions e16da86dfb1 is described below commit e16da86dfb1fbeee541cd9dfccd5f5f4520b7396 Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com> AuthorDate: Tue May 14 14:10:24 2024 +0700 [FLINK-35098][ORC] Fix incorrect results with literal first expressions --- .../main/java/org/apache/flink/orc/OrcFilters.java | 8 +-- .../apache/flink/orc/OrcFileSystemFilterTest.java | 58 +++--- .../org/apache/flink/orc/OrcFileSystemITCase.java | 11 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java index 4393356fc30..46eb9af0927 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java @@ -83,28 +83,28 @@ public class OrcFilters { convertBinary( call, OrcFilters::convertGreaterThan, - OrcFilters::convertLessThanEquals)) + OrcFilters::convertLessThan)) .put( BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertGreaterThanEquals, - OrcFilters::convertLessThan)) + OrcFilters::convertLessThanEquals)) .put( BuiltInFunctionDefinitions.LESS_THAN, call -> convertBinary( call, OrcFilters::convertLessThan, - OrcFilters::convertGreaterThanEquals)) + OrcFilters::convertGreaterThan)) .put( BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertLessThanEquals, - OrcFilters::convertGreaterThan)) + OrcFilters::convertGreaterThanEquals)) .build(); private static boolean isRef(Expression expression) { diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java index cdee400723d..f5d9ed33423 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java @@ -25,13 +25,13 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG; import static org.assertj.core.api.Assertions.assertThat; /** Unit Tests for {@link OrcFileFormatFactory}. */ @@ -39,7 +39,7 @@ class OrcFileSystemFilterTest { @Test @SuppressWarnings("unchecked") -public void testApplyPredicate() { +void testApplyPredicate() { List args = new ArrayList<>(); // equal @@ -53,8 +53,7 @@ class OrcFileSystemFilterTest { CallExpression.permanent( BuiltInFunctionDefinitions.EQUALS, args, DataTypes.BOOLEAN()); OrcFilters.Predicate predicate1 = OrcFilters.toOrcPredicate(equalExpression); -OrcFilters.Predicate predicate2 = -new OrcFilters.Eq
(flink) branch master updated: [FLINK-35098][ORC] Fix incorrect results with literal first expressions
This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4165bac27bd [FLINK-35098][ORC] Fix incorrect results with literal first expressions 4165bac27bd is described below commit 4165bac27bda4457e5940a994d923242d4a271dc Author: Andrey Gaskov <31715230+empath...@users.noreply.github.com> AuthorDate: Tue May 14 14:09:51 2024 +0700 [FLINK-35098][ORC] Fix incorrect results with literal first expressions --- .../main/java/org/apache/flink/orc/OrcFilters.java | 8 +-- .../apache/flink/orc/OrcFileSystemFilterTest.java | 58 +++--- .../org/apache/flink/orc/OrcFileSystemITCase.java | 11 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java index 4393356fc30..46eb9af0927 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java @@ -83,28 +83,28 @@ public class OrcFilters { convertBinary( call, OrcFilters::convertGreaterThan, - OrcFilters::convertLessThanEquals)) + OrcFilters::convertLessThan)) .put( BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertGreaterThanEquals, - OrcFilters::convertLessThan)) + OrcFilters::convertLessThanEquals)) .put( BuiltInFunctionDefinitions.LESS_THAN, call -> convertBinary( call, OrcFilters::convertLessThan, - OrcFilters::convertGreaterThanEquals)) + OrcFilters::convertGreaterThan)) .put( BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, call -> convertBinary( call, OrcFilters::convertLessThanEquals, - OrcFilters::convertGreaterThan)) + OrcFilters::convertGreaterThanEquals)) .build(); private static boolean isRef(Expression expression) { diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java index cdee400723d..f5d9ed33423 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java @@ -25,13 +25,13 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type.LONG; import static org.assertj.core.api.Assertions.assertThat; /** Unit Tests for {@link OrcFileFormatFactory}. */ @@ -39,7 +39,7 @@ class OrcFileSystemFilterTest { @Test @SuppressWarnings("unchecked") -public void testApplyPredicate() { +void testApplyPredicate() { List args = new ArrayList<>(); // equal @@ -53,8 +53,7 @@ class OrcFileSystemFilterTest { CallExpression.permanent( BuiltInFunctionDefinitions.EQUALS, args, DataTypes.BOOLEAN()); OrcFilters.Predicate predicate1 = OrcFilters.toOrcPredicate(equalExpression); -OrcFilters.Predicate predicate2 = -new OrcFilters.Equals("long1"
(flink-connector-kudu) branch main updated: [hotfix] Remove non-existing branches for Weekly tests
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git The following commit(s) were added to refs/heads/main by this push: new 2b82430 [hotfix] Remove non-existing branches for Weekly tests 2b82430 is described below commit 2b82430280d347193d8ba26a844a3c4344365964 Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Tue May 14 09:09:38 2024 +0200 [hotfix] Remove non-existing branches for Weekly tests --- .github/workflows/weekly.yml | 18 +++--- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index f3210c0..7b87f4d 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,9 +30,6 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.17-SNAPSHOT, - branch: main -}, { flink: 1.18-SNAPSHOT, jdk: '8, 11, 17', branch: main @@ -41,18 +38,9 @@ jobs: jdk: '8, 11, 17, 21', branch: main }, { - flink: 1.17.2, - branch: v3.1 -}, { - flink: 1.18.1, - jdk: '8, 11, 17', - branch: v3.1 -}, { - flink: 1.17.2, - branch: v3.0 -}, { - flink: 1.18.1, - branch: v3.0 + flink: 1.20-SNAPSHOT, + jdk: '8, 11, 17, 21', + branch: main }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with:
(flink-connector-kudu) branch main updated: [FLINK-34961] Use dedicated CI name for JDBC connector to differentiate it in infra-reports
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git The following commit(s) were added to refs/heads/main by this push: new 4098566 [FLINK-34961] Use dedicated CI name for JDBC connector to differentiate it in infra-reports 4098566 is described below commit 409856698df1bf630c58f6e761d45e4dc8f06ad5 Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Tue May 14 09:04:00 2024 +0200 [FLINK-34961] Use dedicated CI name for JDBC connector to differentiate it in infra-reports --- .github/workflows/push_pr.yml | 5 - .github/workflows/weekly.yml | 5 - 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 72b98a2..da2f077 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -16,7 +16,10 @@ # limitations under the License. -name: CI +# We need to specify repo related information here since Apache INFRA doesn't differentiate +# between several workflows with the same names while preparing a report for GHA usage +# https://infra-reports.apache.org/#ghactions +name: Flink Connector Kudu CI on: [push, pull_request] concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index aaa729f..f3210c0 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -16,7 +16,10 @@ # limitations under the License. -name: Nightly +# We need to specify repo related information here since Apache INFRA doesn't differentiate +# between several workflows with the same names while preparing a report for GHA usage +# https://infra-reports.apache.org/#ghactions +name: Weekly Flink Connector Kudu on: schedule: - cron: "0 0 * * 0"
(flink-connector-kudu) 40/44: [FLINK-34930] Fix KuduTableSourceITCase
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit abc4a3b965e54406eaddcf06261116916440c4c5 Author: Ferenc Csaky AuthorDate: Thu Mar 28 15:05:46 2024 +0100 [FLINK-34930] Fix KuduTableSourceITCase --- .../connector/kudu/table/KuduTableSourceITCase.java | 20 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java index 2468e77..7971bda 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduTableSourceITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,9 +37,11 @@ public class KuduTableSourceITCase extends KuduTestBase { private TableEnvironment tableEnv; private KuduCatalog catalog; +private KuduTableInfo tableInfo = null; + @BeforeEach -public void init() { -KuduTableInfo tableInfo = booksTableInfo("books", true); +void init() { +tableInfo = booksTableInfo("books", true); setUpDatabase(tableInfo); tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(); catalog = new KuduCatalog(getMasterAddress()); @@ -46,6 +49,14 @@ public class KuduTableSourceITCase extends KuduTestBase { tableEnv.useCatalog("kudu"); } +@AfterEach +void cleanup() { +if (tableInfo != null) { +cleanDatabase(tableInfo); +tableInfo = null; +} +} + @Test void testFullBatchScan() throws Exception { CloseableIterator it = @@ -53,7 +64,8 @@ public class KuduTableSourceITCase extends KuduTestBase { List results = new ArrayList<>(); it.forEachRemaining(results::add); assertEquals(5, results.size()); -assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString()); +assertEquals( +"+I[1001, Java for dummies, Tan Ah Teck, 11.11, 11]", results.get(0).toString()); tableEnv.executeSql("DROP TABLE books"); } @@ -68,7 +80,7 @@ public class KuduTableSourceITCase extends KuduTestBase { List results = new ArrayList<>(); it.forEachRemaining(results::add); assertEquals(1, results.size()); -assertEquals("More Java for more dummies", results.get(0).toString()); +assertEquals("+I[More Java for more dummies]", results.get(0).toString()); tableEnv.executeSql("DROP TABLE books"); } }
(flink-connector-kudu) 25/44: BAHIR-296: Unify mockito version to 1.10.19
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 1dce82988ac057da3ca3ee5dcb67df227cd86284 Author: Joao Boto AuthorDate: Mon Dec 27 16:36:32 2021 +0100 BAHIR-296: Unify mockito version to 1.10.19 --- flink-connector-kudu/pom.xml | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 12fd9da..750929c 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -31,7 +31,6 @@ 1.13.0 -1.10.19 @@ -97,8 +96,6 @@ org.mockito mockito-all - ${mockito.version} - test @@ -119,7 +116,7 @@ ${log4j.version} test - +
(flink-connector-kudu) 41/44: [FLINK-34930] State Bahir fork
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 9253b9d7b59abd34c7f0c87e6ad5673984ed03c9 Author: Ferenc Csaky AuthorDate: Tue Apr 2 16:57:16 2024 +0200 [FLINK-34930] State Bahir fork --- README.md | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1944eca..477aa84 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ This repository contains the official Apache Flink Kudu connector. +## Forked from Bahir + +The connector code is forked from [Apache Bahir](https://bahir.apache.org/) project after its retirement. + ## Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. @@ -65,4 +69,4 @@ This article describes [how to contribute to Apache Flink](https://flink.apache. ## About Apache Flink is an open source project of The Apache Software Foundation (ASF). -The Apache Flink project originated from the [Stratosphere](http://stratosphere.eu) research project. \ No newline at end of file +The Apache Flink project originated from the [Stratosphere](http://stratosphere.eu) research project.
(flink-connector-kudu) 42/44: [FLINK-34930] Skip spotless for JDK 21+
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit c7dfb67801ced03fe028e5f738fc2ced670440b4 Author: Ferenc Csaky AuthorDate: Mon Apr 8 13:40:05 2024 +0200 [FLINK-34930] Skip spotless for JDK 21+ --- pom.xml | 25 + 1 file changed, 25 insertions(+) diff --git a/pom.xml b/pom.xml index 6d8617a..69e2d74 100644 --- a/pom.xml +++ b/pom.xml @@ -321,4 +321,29 @@ under the License. + + + + java21 + + [21,) + + + + + + com.diffplug.spotless + spotless-maven-plugin + + + true + + + + + + +
(flink-connector-kudu) 17/44: Add batch table env support and filter push down to Kudu connector (#82)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 6e8b66f16b25802f5111f7b70a20e7d7a1ca5d65 Author: Sebastian Liu AuthorDate: Tue Jul 14 11:25:44 2020 +0800 Add batch table env support and filter push down to Kudu connector (#82) Update the KuduTableSource to inherit from InputFormatTableSource in order to support both streaming SQL and Batch SQL at the same time. In order to reduce unnecessary data transmission, the filter push down was also added to the KuduTableSource. --- flink-connector-kudu/README.md | 2 +- flink-connector-kudu/pom.xml | 2 +- .../connectors/kudu/connector/KuduFilterInfo.java | 14 +- .../flink/connectors/kudu/table/KuduCatalog.java | 5 +- .../connectors/kudu/table/KuduCatalogFactory.java | 4 +- .../connectors/kudu/table/KuduTableFactory.java| 2 +- .../connectors/kudu/table/KuduTableSource.java | 96 +-- .../kudu/table/utils/KuduTableUtils.java | 142 .../connectors/kudu/table/KuduCatalogTest.java | 2 +- .../kudu/table/KuduTableFactoryTest.java | 2 +- .../kudu/table/KuduTableSourceITCase.java | 67 .../connectors/kudu/table/KuduTableSourceTest.java | 181 + .../connectors/kudu/table/KuduTableTestUtils.java | 10 +- 13 files changed, 496 insertions(+), 33 deletions(-) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index 14c13eb..6370aa6 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -184,7 +184,7 @@ are described as being nullable, and not being primary keys. ## DataStream API -It is also possible to use the the Kudu connector directly from the DataStream API however we +It is also possible to use the Kudu connector directly from the DataStream API however we encourage all users to explore the Table API as it provides a lot of useful tooling when working with Kudu data. diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index fe7887c..3bbefee 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -52,7 +52,7 @@ org.apache.flink - flink-table-planner-blink_2.11 + flink-table-planner-blink_${scala.binary.version} ${flink.version} provided diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index 0a89cad..08fa86b 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -68,25 +68,25 @@ public class KuduFilterInfo implements Serializable { predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value); break; case FLOAT: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (float) this.value); break; case INT8: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte) this.value); break; case INT16: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (short) this.value); break; case INT32: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (int) this.value); break; case INT64: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (long) this.value); break; case DOUBLE: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (double) this.value); break; case BOOL: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, (boolean) this.value); break; case UNIXTIME_MICROS:
(flink-connector-kudu) 43/44: [FLINK-34930] Enable module opens for tests for newer JDKs
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 5230ec106551307d83d3998fbe0d08d35fc7d2f8 Author: Ferenc Csaky AuthorDate: Thu Apr 11 14:42:02 2024 +0200 [FLINK-34930] Enable module opens for tests for newer JDKs --- pom.xml | 4 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index 69e2d74..d1548f7 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,10 @@ under the License. flink-connector-kudu-parent + + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED +
(flink-connector-kudu) 44/44: [FLINK-34930] Migrate Bahir NOTICE
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 8674446dc0f1bab67528347ebfefc206bd380b6e Author: Ferenc Csaky AuthorDate: Wed Apr 17 11:26:04 2024 +0200 [FLINK-34930] Migrate Bahir NOTICE --- NOTICE | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/NOTICE b/NOTICE index c1e8320..c3ce37e 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ -Apache Flink Kudu Connector -Copyright 2014-2024 The Apache Software Foundation +Apache Bahir +Copyright (c) 2016-2024 The Apache Software Foundation. This product includes software developed at The Apache Software Foundation (http://www.apache.org/). @@ -12,5 +12,3 @@ ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUT DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -
(flink-connector-kudu) 34/44: [BAHIR-308] Bump flink version to 1.15.3
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 6ff53e7c4d40f3986983439f35083407693026b5 Author: Joao Boto AuthorDate: Wed Mar 8 17:11:35 2023 +0100 [BAHIR-308] Bump flink version to 1.15.3 --- flink-connector-kudu/pom.xml | 16 ++--- .../kudu/connector/ColumnSchemasFactory.java | 1 - .../kudu/connector/CreateTableOptionsFactory.java | 1 - .../connectors/kudu/connector/KuduFilterInfo.java | 1 - .../connectors/kudu/connector/KuduTableInfo.java | 3 +- .../convertor/RowResultRowDataConvertor.java | 6 +--- .../kudu/connector/failure/KuduFailureHandler.java | 1 - .../kudu/connector/reader/KuduReader.java | 10 ++ .../kudu/connector/reader/KuduReaderConfig.java| 3 +- .../writer/AbstractSingleOperationMapper.java | 1 - .../kudu/connector/writer/KuduOperationMapper.java | 1 - .../kudu/connector/writer/KuduWriter.java | 9 + .../kudu/connector/writer/KuduWriterConfig.java| 3 +- .../kudu/connector/writer/PojoOperationMapper.java | 6 +--- .../connectors/kudu/format/KuduOutputFormat.java | 1 - .../flink/connectors/kudu/streaming/KuduSink.java | 1 - .../kudu/table/AbstractReadOnlyCatalog.java| 22 ++-- .../flink/connectors/kudu/table/KuduCatalog.java | 35 --- .../connectors/kudu/table/KuduTableFactory.java| 28 +++ .../connectors/kudu/table/KuduTableSource.java | 13 ++- .../kudu/table/UpsertOperationMapper.java | 1 - .../kudu/table/dynamic/KuduDynamicTableSource.java | 40 +++--- .../table/dynamic/catalog/KuduDynamicCatalog.java | 32 +++-- .../kudu/table/utils/KuduTableUtils.java | 12 ++- .../connectors/kudu/table/utils/KuduTypeUtils.java | 14 +--- .../connectors/kudu/connector/KuduTestBase.java| 19 -- .../kudu/format/KuduOutputFormatTest.java | 3 +- .../connectors/kudu/streaming/KuduSinkTest.java| 1 - .../connectors/kudu/table/KuduCatalogTest.java | 1 - .../kudu/table/KuduTableFactoryTest.java | 11 ++ .../kudu/table/KuduTableSourceITCase.java | 4 +-- .../connectors/kudu/table/KuduTableSourceTest.java | 18 ++ .../connectors/kudu/table/KuduTableTestUtils.java | 4 +-- .../kudu/writer/AbstractOperationTest.java | 8 + .../kudu/writer/PojoOperationMapperTest.java | 3 +- .../kudu/writer/RowOperationMapperTest.java| 1 - .../kudu/writer/TupleOpertaionMapperTest.java | 1 - 37 files changed, 78 insertions(+), 257 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 20b16b4..134d6f7 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -36,13 +36,6 @@ - -org.apache.kudu -kudu-binary -${kudu.version} -${os.detected.classifier} -test - org.apache.kudu kudu-client @@ -85,11 +78,11 @@ org.apache.flink - flink-clients_${scala.binary.version} + flink-clients org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java org.apache.flink @@ -99,11 +92,6 @@ org.apache.flink flink-table-planner_${scala.binary.version} - - org.apache.kudu - kudu-binary - ${os.detected.classifier} - org.apache.kudu kudu-client diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java index b178308..4997938 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.ColumnSchema; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java index 4a475e9..fd9bfa4 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.client.CreateTableOptions;
(flink-connector-kudu) 18/44: BAHIR-240: replace docker test by testcontainer (#89)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 3f9db639b956585ba4e2a7bc99557646d1fd53ff Author: Joao Boto AuthorDate: Tue Jul 28 17:41:33 2020 +0200 BAHIR-240: replace docker test by testcontainer (#89) --- flink-connector-kudu/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 3bbefee..bbf168e 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -32,7 +32,6 @@ 1.11.1 1.10.19 -!DockerTest
(flink-connector-kudu) 06/44: [BAHIR-180] Improve eventual consistence for Kudu connector
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 247dfe99486f04622b4ee7d83fa51ef253aaa75d Author: eskabetxe AuthorDate: Tue Jan 15 13:05:29 2019 +0100 [BAHIR-180] Improve eventual consistence for Kudu connector Closes #40 --- flink-connector-kudu/pom.xml | 2 +- .../streaming/connectors/kudu/KuduInputFormat.java | 27 ++--- .../connectors/kudu/KuduOutputFormat.java | 36 +++--- .../flink/streaming/connectors/kudu/KuduSink.java | 34 +++--- .../connectors/kudu/connector/KuduColumnInfo.java | 50 .../connectors/kudu/connector/KuduConnector.java | 81 - .../connectors/kudu/connector/KuduMapper.java | 59 + .../connectors/kudu/connector/KuduRow.java | 72 ++- .../connectors/kudu/connector/KuduRowIterator.java | 57 + .../connectors/kudu/serde/DefaultSerDe.java| 39 ++ .../connectors/kudu/serde/KuduDeserialization.java | 25 .../connectors/kudu/serde/KuduSerialization.java | 28 + .../streaming/connectors/kudu/serde/PojoSerDe.java | 134 + .../connectors/kudu/KuduOuputFormatTest.java | 11 +- .../streaming/connectors/kudu/KuduSinkTest.java| 11 +- .../connectors/kudu/connector/KuduDatabase.java| 2 +- .../connectors/kudu/serde/PojoSerDeTest.java | 57 + 17 files changed, 543 insertions(+), 182 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 5540fc1..61ab4a6 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ jar -1.8.0 +1.7.1 !DockerTest diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java index 617e317..fd126d0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java @@ -24,7 +24,9 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.streaming.connectors.kudu.connector.*; import org.apache.flink.util.Preconditions; -import org.apache.kudu.client.*; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.LocatedTablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +45,7 @@ public class KuduInputFormat extends RichInputFormat implements OutputFormat { +public class KuduOutputFormat extends RichOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); @@ -36,10 +37,12 @@ public class KuduOutputFormat implements OutputFormat private KuduConnector.Consistency consistency; private KuduConnector.WriteMode writeMode; -private transient KuduConnector tableContext; +private KuduSerialization serializer; +private transient KuduConnector connector; -public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) { + +public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization serializer) { Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); this.kuduMasters = kuduMasters; @@ -47,8 +50,10 @@ public class KuduOutputFormat implements OutputFormat this.tableInfo = tableInfo; this.consistency = KuduConnector.Consistency.STRONG; this.writeMode = KuduConnector.WriteMode.UPSERT; +this.serializer = serializer.withSchema(tableInfo.getSchema()); } + public KuduOutputFormat withEventualConsistency() { this.consistency = KuduConnector.Consistency.EVENTUAL; return this; @@ -81,28 +86,31 @@ public class KuduOutputFormat implements OutputFormat @Override public void open(int taskNumber, int numTasks) throws IOException { -startTableContext(); -} - -private void startTableContext() throws IOException { -if (tableContext != null) return; -tableContext = new KuduConnector(kuduMasters, tableInfo); +if (connector != null) return; +connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); +serializer = serializer.withSchema(tableInfo.getSchema()); } @Override -public void writeRecord(OUT kuduRow) throws IOException { +public void writeRecord(OUT row) throws IOException { +boolean response; try { -tableContext.writeRow(kuduRow, consistency, writeMode); +KuduRow kuduRow = serializer.serialize(row); +response = connecto
(flink-connector-kudu) 11/44: [BAHIR-207] Add tests for scala 2.12 on travis (#59)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 47133a3e4873c0a4295ed4c002d805fbd3304408 Author: Joao Boto AuthorDate: Thu Jul 4 00:13:51 2019 +0200 [BAHIR-207] Add tests for scala 2.12 on travis (#59) --- .../apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java | 6 +++--- .../org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java| 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java index b9aaa40..4e91310 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java @@ -45,7 +45,7 @@ public class KuduOuputFormatTest extends KuduDatabase { public void testNotTableExist() throws IOException { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); -KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()); +KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()); Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1)); } @@ -54,7 +54,7 @@ public class KuduOuputFormatTest extends KuduDatabase { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); -KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) +KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) .withStrongConsistency(); outputFormat.open(0,1); @@ -74,7 +74,7 @@ public class KuduOuputFormatTest extends KuduDatabase { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); -KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) +KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) .withEventualConsistency(); outputFormat.open(0,1); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java index 83e060d..225bf7c 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java @@ -58,7 +58,7 @@ public class KuduSinkTest extends KuduDatabase { public void testNotTableExist() throws IOException { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); -KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()); +KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()); sink.setRuntimeContext(context); Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); } @@ -68,7 +68,7 @@ public class KuduSinkTest extends KuduDatabase { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); -KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) +KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) .withStrongConsistency(); sink.setRuntimeContext(context); sink.open(new Configuration()); @@ -88,7 +88,7 @@ public class KuduSinkTest extends KuduDatabase { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); -KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) +KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) .withEventualConsistency(); sink.setRuntimeContext(context); sink.open(new Configuration());
(flink-connector-kudu) 37/44: [FLINK-34930] Adapt POM files to the new structure, remove flink-shaded Guava usage
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 033554484423caa9ff1fbd830dc3b15b066b8463 Author: Ferenc Csaky AuthorDate: Thu Mar 28 12:30:27 2024 +0100 [FLINK-34930] Adapt POM files to the new structure, remove flink-shaded Guava usage --- flink-connector-kudu/pom.xml | 237 ++- .../flink/connectors/kudu/table/KuduCatalog.java | 3 +- .../connectors/kudu/table/KuduTableSource.java | 5 +- .../function/lookup/KuduRowDataLookupFunction.java | 4 +- .../kudu/table/utils/KuduTableUtils.java | 3 +- pom.xml| 324 + 6 files changed, 423 insertions(+), 153 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 8dc4f88..22817ff 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -1,148 +1,97 @@ -http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> - 4.0.0 - - -org.apache.bahir -bahir-flink-parent -1.2-SNAPSHOT -../pom.xml - - - flink-connector-kudu - jar - - flink-connector-kudu - - -1.13.0 - - - - - -org.apache.kudu -kudu-client -${kudu.version} - - -org.apache.kudu -kudu-test-utils -${kudu.version} -test - - -org.apache.logging.log4j -log4j-api -${log4j.version} -test - - -org.apache.logging.log4j -log4j-core -${log4j.version} -test - - -org.apache.logging.log4j -log4j-slf4j-impl -${log4j.version} -test - - - -org.junit.jupiter -junit-jupiter-migrationsupport -${junit.jupiter.version} -test - - - - - - - org.apache.flink - flink-clients - - - org.apache.flink - flink-streaming-java - - - org.apache.flink - flink-table-api-java-bridge - - - org.apache.flink - flink-table-common - - - org.apache.flink - flink-table-planner-loader - - - org.apache.flink - flink-table-runtime - - - org.apache.kudu - kudu-client - - - org.apache.kudu - kudu-test-utils - - - org.apache.logging.log4j - log4j-api - - - org.apache.logging.log4j - log4j-core - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - org.junit.jupiter - junit-jupiter-migrationsupport - - - org.mockito - mockito-all - - - org.testcontainers - testcontainers - - - - - - -kr.motd.maven -os-maven-plugin -1.6.2 - - - +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-connector-kudu-parent + 2.0-SNAPSHOT + + + flink-connector-kudu + Flink : Connectors : Kudu + jar + + + + org.apache.flink + flink-clients + + + + org.apache.flink + flink-streaming-java + + + + org.apache.flink + flink-table-api-java-bridge + + + + org.apache.flink + flink-table-common + + + + org.apache.flink + flink-table-planner-loader + + + + org.apache.
(flink-connector-kudu) 39/44: [FLINK-34930] Rename base package
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 34cb67fd62aefc03ed67ee75731ef2a2c82b1fff Author: Ferenc Csaky AuthorDate: Wed Apr 10 16:27:19 2024 +0200 [FLINK-34930] Rename base package --- .../kudu/connector/ColumnSchemasFactory.java | 2 +- .../kudu/connector/CreateTableOptionsFactory.java | 2 +- .../kudu/connector/KuduFilterInfo.java | 2 +- .../kudu/connector/KuduTableInfo.java | 2 +- .../connector/converter/RowResultConverter.java| 2 +- .../connector/converter/RowResultRowConverter.java | 2 +- .../converter/RowResultRowDataConverter.java | 2 +- .../failure/DefaultKuduFailureHandler.java | 2 +- .../kudu/connector/failure/KuduFailureHandler.java | 5 ++- .../kudu/connector/reader/KuduInputSplit.java | 2 +- .../kudu/connector/reader/KuduReader.java | 8 ++-- .../kudu/connector/reader/KuduReaderConfig.java| 7 ++-- .../kudu/connector/reader/KuduReaderIterator.java | 4 +- .../writer/AbstractSingleOperationMapper.java | 2 +- .../kudu/connector/writer/KuduOperationMapper.java | 2 +- .../kudu/connector/writer/KuduWriter.java | 8 ++-- .../kudu/connector/writer/KuduWriterConfig.java| 9 +++-- .../kudu/connector/writer/PojoOperationMapper.java | 2 +- .../writer/RowDataUpsertOperationMapper.java | 2 +- .../kudu/connector/writer/RowOperationMapper.java | 2 +- .../connector/writer/TupleOperationMapper.java | 2 +- .../kudu/format/AbstractKuduInputFormat.java | 18 - .../kudu/format/KuduOutputFormat.java | 14 +++ .../kudu/format/KuduRowDataInputFormat.java| 10 ++--- .../kudu/format/KuduRowInputFormat.java| 10 ++--- .../kudu/streaming/KuduSink.java | 14 +++ .../kudu/table/AbstractReadOnlyCatalog.java| 2 +- .../kudu/table/KuduCatalog.java| 25 ++--- .../kudu/table/KuduTableFactory.java | 10 ++--- .../kudu/table/KuduTableSink.java | 8 ++-- .../kudu/table/KuduTableSource.java| 17 - .../kudu/table/UpsertOperationMapper.java | 4 +- .../kudu/table/dynamic/KuduDynamicTableSink.java | 10 ++--- .../kudu/table/dynamic/KuduDynamicTableSource.java | 20 +- .../dynamic/KuduDynamicTableSourceSinkFactory.java | 12 +++--- .../table/dynamic/catalog/KuduCatalogFactory.java | 6 +-- .../table/dynamic/catalog/KuduDynamicCatalog.java | 18 - .../table/function/lookup/KuduLookupOptions.java | 2 +- .../function/lookup/KuduRowDataLookupFunction.java | 20 +- .../kudu/table/utils/KuduTableUtils.java | 43 +- .../kudu/table/utils/KuduTypeUtils.java| 2 +- .../org.apache.flink.table.factories.Factory | 2 +- .../org.apache.flink.table.factories.TableFactory | 4 +- .../kudu/connector/KuduFilterInfoTest.java | 2 +- .../kudu/connector/KuduTestBase.java | 22 +-- .../kudu/format/KuduOutputFormatTest.java | 12 +++--- .../kudu/format/KuduRowDataInputFormatTest.java| 16 .../kudu/format/KuduRowInputFormatTest.java| 12 +++--- .../kudu/streaming/KuduSinkTest.java | 12 +++--- .../kudu/table/KuduCatalogTest.java| 14 +++ .../kudu/table/KuduTableFactoryTest.java | 8 ++-- .../kudu/table/KuduTableSourceITCase.java | 6 +-- .../kudu/table/KuduTableSourceTest.java| 6 +-- .../kudu/table/KuduTableTestUtils.java | 2 +- .../kudu/table/dynamic/KuduDynamicSinkTest.java| 6 +-- .../kudu/table/dynamic/KuduDynamicSourceTest.java | 6 +-- .../dynamic/KuduRowDataLookupFunctionTest.java | 12 +++--- .../kudu/writer/AbstractOperationTest.java | 4 +- .../kudu/writer/PojoOperationMapperTest.java | 15 .../writer/RowDataUpsertOperationMapperTest.java | 6 +-- .../kudu/writer/RowOperationMapperTest.java| 8 ++-- .../kudu/writer/TupleOperationMapperTest.java | 8 ++-- 62 files changed, 264 insertions(+), 255 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFactory.java similarity index 96% rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFactory.java index 194af17..95fe97d 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/ColumnSchemasFa
(flink-connector-kudu) 28/44: [BAHIR-302] Group declaration of flink dependencies on parent pom
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 285a6e133745d462fb1773d203026594a4cce802 Author: Joao Boto AuthorDate: Mon May 2 20:00:15 2022 +0200 [BAHIR-302] Group declaration of flink dependencies on parent pom --- flink-connector-kudu/pom.xml | 24 1 file changed, 24 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 9ace382..6f68522 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -36,30 +36,6 @@ - -org.apache.flink -flink-clients_${scala.binary.version} -${flink.version} -test - - -org.apache.flink -flink-streaming-java_${scala.binary.version} -${flink.version} -provided - - -org.apache.flink -flink-table-common -${flink.version} -provided - - -org.apache.flink -flink-table-planner_${scala.binary.version} -${flink.version} -provided - org.apache.kudu kudu-binary
(flink-connector-kudu) 35/44: [BAHIR-308] Remove scala prefix where we can
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 78c76c544ab426461e9e4156ebb9ddf561b7b97c Author: Joao Boto AuthorDate: Thu Mar 23 13:30:24 2023 +0100 [BAHIR-308] Remove scala prefix where we can --- flink-connector-kudu/pom.xml | 14 +++--- .../kudu/table/dynamic/KuduDynamicTableSource.java | 3 +-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 134d6f7..8dc4f88 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -20,12 +20,12 @@ org.apache.bahir -bahir-flink-parent_2.12 +bahir-flink-parent 1.2-SNAPSHOT ../pom.xml - flink-connector-kudu_2.12 + flink-connector-kudu jar flink-connector-kudu @@ -84,13 +84,21 @@ org.apache.flink flink-streaming-java + + org.apache.flink + flink-table-api-java-bridge + org.apache.flink flink-table-common org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner-loader + + + org.apache.flink + flink-table-runtime org.apache.kudu diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java index cde6a13..da6bebb 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; import java.util.*; -import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument; import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly; /** @@ -143,7 +142,7 @@ public class KuduDynamicTableSource implements ScanTableSource, SupportsProjecti } private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) { -checkArgument( +Preconditions.checkArgument( containsPhysicalColumnsOnly(tableSchema), "Projection is only supported for physical columns."); TableSchema.Builder builder = TableSchema.builder();
(flink-connector-kudu) 20/44: [BAHIR-263] Update flink to 1.12.2 (#115)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 4a5ec773266d76568d89b5f291861307f64810fb Author: Joao Boto AuthorDate: Fri Mar 12 08:53:06 2021 +0100 [BAHIR-263] Update flink to 1.12.2 (#115) --- .../connectors/kudu/table/KuduCatalogTest.java | 16 ++--- .../kudu/table/KuduTableFactoryTest.java | 12 +- .../connectors/kudu/table/KuduTableSourceTest.java | 2 +- .../src/test/resources/log4j.properties| 27 ++ 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java index 4bb1871..2bc8b12 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java @@ -77,7 +77,7 @@ public class KuduCatalogTest extends KuduTestBase { tableEnv.executeSql("INSERT INTO TestTable1 VALUES ('f', 's')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); // Add this once Primary key support has been enabled @@ -101,7 +101,7 @@ public class KuduCatalogTest extends KuduTestBase { tableEnv.executeSql("INSERT INTO TestTable3 VALUES ('f', 2, 't')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); validateMultiKey("TestTable3"); @@ -113,14 +113,14 @@ public class KuduCatalogTest extends KuduTestBase { tableEnv.executeSql("INSERT INTO TestTable5 VALUES ('s', 'f', 't')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); tableEnv.executeSql("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); tableEnv.executeSql("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); validateSingleKey("TestTable6"); @@ -133,12 +133,12 @@ public class KuduCatalogTest extends KuduTestBase { tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f','s')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f2','s2')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP"); @@ -225,7 +225,7 @@ public class KuduCatalogTest extends KuduTestBase { tableEnv.executeSql("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); KuduTable kuduTable = harness.getClient().openTable("TestTableTsC"); @@ -252,7 +252,7 @@ public class KuduCatalogTest extends KuduTestBase { "TIMESTAMP '2020-04-15 12:34:56.123') ") .getJobClient() .get() -.getJobExecutionResult(getClass().getClassLoader()) +.getJobExecutionResult() .get(1, TimeUnit.MINUTES); validateManyTypes("TestTable8"); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java index d852f8e..d4de7f6 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java @@ -64,7 +64,7 @@ public class KuduTableFactoryTest extends KuduTestBase { "WITH ('connector.t
(flink-connector-kudu) 05/44: [BAHIR-194] bump kudu version to 1.8.0
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit f0cd1d9243abb110ce312a82adb106afe233e078 Author: Joao Boto AuthorDate: Sun Feb 10 23:38:43 2019 +0100 [BAHIR-194] bump kudu version to 1.8.0 Closes 44 --- flink-connector-kudu/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 61ab4a6..5540fc1 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ jar -1.7.1 +1.8.0 !DockerTest
(flink-connector-kudu) 36/44: [BAHIR-324] Closing KuduReader at JobManager
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit d8c803af8921d4c26865143f5ea3976f43a163f2 Author: Shimin Huang <40719512+coll...@users.noreply.github.com> AuthorDate: Wed May 31 00:37:14 2023 +0800 [BAHIR-324] Closing KuduReader at JobManager --- .../kudu/format/AbstractKuduInputFormat.java | 22 +++--- .../function/lookup/KuduRowDataLookupFunction.java | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java index 0cdf570..4976241 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java @@ -104,19 +104,23 @@ public abstract class AbstractKuduInputFormat extends RichInputFormat extends RichInputFormat { ArrayList rows = new ArrayList<>(); for (KuduInputSplit inputSplit : inputSplits) { KuduReaderIterator scanner = kuduReader.scanner(inputSplit.getScanToken()); -// 没有启用cache +// not use cache if (cache == null) { while (scanner.hasNext()) { collect(scanner.next());
(flink-connector-kudu) 19/44: [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 7b5d717b609ca68cd8e812773585c2c295946619 Author: Gyula Fora AuthorDate: Thu Sep 24 20:16:10 2020 +0200 [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99) Co-authored-by: Gyula Fora --- flink-connector-kudu/pom.xml | 32 --- .../kudu/connector/reader/KuduReader.java | 2 +- .../kudu/connector/writer/KuduWriter.java | 2 +- .../connectors/kudu/table/KuduTableFactory.java| 55 +++- .../flink/connectors/kudu/table/KuduTableSink.java | 3 - ...utFormatTest.java => KuduOutputFormatTest.java} | 4 +- .../connectors/kudu/streaming/KuduSinkTest.java| 2 +- .../connectors/kudu/table/KuduCatalogTest.java | 98 +- .../kudu/table/KuduTableFactoryTest.java | 89 +++- .../kudu/table/KuduTableSourceITCase.java | 16 ++-- .../connectors/kudu/table/KuduTableSourceTest.java | 5 +- .../connectors/kudu/table/KuduTableTestUtils.java | 2 +- .../{log4j.properties => log4j2-test.properties} | 17 ++-- 13 files changed, 170 insertions(+), 157 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index bbf168e..a76102e 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ jar -1.11.1 +1.13.0 1.10.19 @@ -79,6 +79,13 @@ test + +org.apache.flink +flink-clients_${scala.binary.version} +${flink.version} +test + + org.junit.jupiter @@ -95,17 +102,22 @@ - org.slf4j - slf4j-log4j12 - ${slf4j.version} - runtime + org.apache.logging.log4j + log4j-api + ${log4j2.version} + test - - log4j - log4j - ${log4j.version} - runtime + org.apache.logging.log4j + log4j-core + ${log4j2.version} + test + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j2.version} + test diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java index 51ab748..d7a0c61 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java @@ -82,7 +82,7 @@ public class KuduReader implements AutoCloseable { if (tableInfo.getCreateTableIfNotExists()) { return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); } -throw new UnsupportedOperationException("table not exists and is marketed to not be created"); +throw new RuntimeException("Table " + tableName + " does not exist."); } public KuduReaderIterator scanner(byte[] token) throws IOException { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index 7233478..03c37ea 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -83,7 +83,7 @@ public class KuduWriter implements AutoCloseable { if (tableInfo.getCreateTableIfNotExists()) { return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); } -throw new UnsupportedOperationException("table not exists and is marketed to not be created"); +throw new RuntimeException("Table " + tableName + " does not exist."); } public void write(T input) throws IOException { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java index eb72205..1961aad 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java @@ -30,7 +30,6 @@ import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.types.Row; -import static org.apache.flink.util.Preconditions.checkNotNull; import java.util.ArrayList; import java.util.HashMap; @@ -38,25 +37,10 @@ import java.util.
(flink-connector-kudu) 32/44: [BAHIR-321] Make KuduFilterInfo handle String literals
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 2e75a60111bd48d670e94b8b6fc1b2eb52fd70fe Author: Balazs Varga AuthorDate: Tue Jan 3 18:07:55 2023 +0100 [BAHIR-321] Make KuduFilterInfo handle String literals --- .../connectors/kudu/connector/KuduFilterInfo.java | 4 +- .../function/lookup/KuduRowDataLookupFunction.java | 8 +-- .../kudu/connector/KuduFilterInfoTest.java | 41 .../connectors/kudu/connector/KuduTestBase.java| 2 +- .../kudu/table/dynamic/KuduDynamicSourceTest.java | 57 ++ 5 files changed, 107 insertions(+), 5 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index 08fa86b..e7a8d16 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -17,6 +17,7 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; @@ -65,7 +66,8 @@ public class KuduFilterInfo implements Serializable { switch (column.getType()) { case STRING: -predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value); +predicate = KuduPredicate.newComparisonPredicate(column, comparison, +(this.value instanceof BinaryStringData) ? this.value.toString() : (String) this.value); break; case FLOAT: predicate = KuduPredicate.newComparisonPredicate(column, comparison, (float) this.value); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java index 4a4a952..d54f23a 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java @@ -178,9 +178,11 @@ public class KuduRowDataLookupFunction extends TableFunction { if (null != this.kuduReader) { try { this.kuduReader.close(); -this.cache.cleanUp(); -// help gc -this.cache = null; +if (cache != null) { +this.cache.cleanUp(); +// help gc +this.cache = null; +} this.kuduReader = null; } catch (IOException e) { // ignore exception when close. diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java new file mode 100644 index 000..a6c2ff7 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.table.data.binary.BinaryStringData; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +public class KuduFilterInfoTest { + +@Test +void testKuduFilterInfoWithBinaryStringData() { +String filterValue = "someValue"; + +KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create("col") +.equalTo(BinaryStringData.fromStri
(flink-connector-kudu) 33/44: [BAHIR-308] Remove support for scala 2.11
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 5a1da3182cbf2c2afc0800a9da750c031a9915b2 Author: Joao Boto AuthorDate: Wed Jul 6 16:42:06 2022 +0200 [BAHIR-308] Remove support for scala 2.11 --- flink-connector-kudu/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index fd485d5..20b16b4 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -20,12 +20,12 @@ org.apache.bahir -bahir-flink-parent_2.11 +bahir-flink-parent_2.12 1.2-SNAPSHOT ../pom.xml - flink-connector-kudu_2.11 + flink-connector-kudu_2.12 jar flink-connector-kudu
(flink-connector-kudu) 26/44: [BAHIR-302] Add enforcers
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit e9c2a761dc5461e7fe229d837daf412a3e25fd99 Author: Joao Boto AuthorDate: Mon Mar 28 17:18:05 2022 +0200 [BAHIR-302] Add enforcers --- flink-connector-kudu/pom.xml | 143 --- 1 file changed, 92 insertions(+), 51 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 750929c..2a2dde2 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -22,99 +22,140 @@ org.apache.bahir bahir-flink-parent_2.11 1.1-SNAPSHOT -.. +../pom.xml flink-connector-kudu_2.11 - flink-connector-kudu jar + flink-connector-kudu + 1.13.0 - - - - org.apache.kudu - kudu-client - ${kudu.version} - + + + +org.apache.flink +flink-clients_${scala.binary.version} +${flink.version} +test + + +org.apache.flink +flink-streaming-java_${scala.binary.version} +${flink.version} +provided + + +org.apache.flink +flink-table-common +${flink.version} +provided + + +org.apache.flink +flink-table-planner_${scala.binary.version} +${flink.version} +provided + + +org.apache.kudu +kudu-binary +${kudu.version} +${os.detected.classifier} +test + + +org.apache.kudu +kudu-client +${kudu.version} + + +org.apache.kudu +kudu-test-utils +${kudu.version} +test + + +org.apache.logging.log4j +log4j-api +${log4j.version} +test + + +org.apache.logging.log4j +log4j-core +${log4j.version} +test + + +org.apache.logging.log4j +log4j-slf4j-impl +${log4j.version} +test + + + +org.junit.jupiter +junit-jupiter-migrationsupport +${junit.jupiter.version} +test + + + + org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided + flink-clients_${scala.binary.version} - org.apache.flink - flink-table-planner_${scala.binary.version} - ${flink.version} - provided + flink-streaming-java_${scala.binary.version} - org.apache.flink flink-table-common - ${flink.version} - provided - - - org.apache.kudu - kudu-test-utils - ${kudu.version} - test + org.apache.flink + flink-table-planner_${scala.binary.version} - org.apache.kudu kudu-binary - ${kudu.version} ${os.detected.classifier} - test - - - -org.apache.flink -flink-clients_${scala.binary.version} -${flink.version} -test - - - org.junit.jupiter - junit-jupiter-migrationsupport - ${junit.jupiter.version} - test + org.apache.kudu + kudu-client - - org.mockito - mockito-all + org.apache.kudu + kudu-test-utils - org.apache.logging.log4j log4j-api - ${log4j.version} - test org.apache.logging.log4j log4j-core - ${log4j.version} - test org.apache.logging.log4j log4j-slf4j-impl - ${log4j.version} - test + + + + org.junit.jupiter + junit-jupiter-migrationsupport + + + org.mockito + mockito-all
(flink-connector-kudu) 21/44: [BAHIR-260] Add kudu table writer config (#109)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 65004752884890ee49dbffd5bfb52a2a863758ed Author: hackergin AuthorDate: Wed Apr 7 06:35:07 2021 -0500 [BAHIR-260] Add kudu table writer config (#109) --- .../connectors/kudu/connector/KuduTableInfo.java | 18 .../kudu/connector/writer/KuduWriter.java | 5 + .../kudu/connector/writer/KuduWriterConfig.java| 113 - .../connectors/kudu/table/KuduTableFactory.java| 49 - .../flink/connectors/kudu/table/KuduTableSink.java | 21 .../kudu/table/KuduTableFactoryTest.java | 44 +++- 6 files changed, 243 insertions(+), 7 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java index 83c7dde..baae8a0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java @@ -23,6 +23,7 @@ import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; import java.io.Serializable; +import java.util.Objects; /** * Describes which table should be used in sources and sinks along with specifications @@ -103,4 +104,21 @@ public class KuduTableInfo implements Serializable { } return createTableOptionsFactory.getCreateTableOptions(); } + +@Override +public int hashCode() { +return Objects.hash(name); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +KuduTableInfo that = (KuduTableInfo) o; +return Objects.equals(this.name, that.name); +} } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index 03c37ea..59ad196 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -72,6 +72,11 @@ public class KuduWriter implements AutoCloseable { private KuduSession obtainSession() { KuduSession session = client.newSession(); session.setFlushMode(writerConfig.getFlushMode()); +session.setTimeoutMillis(writerConfig.getOperationTimeout()); +session.setMutationBufferSpace(writerConfig.getMaxBufferSize()); +session.setFlushInterval(writerConfig.getFlushInterval()); +session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate()); +session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound()); return session; } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java index 598f8d0..ff93921 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java @@ -19,8 +19,10 @@ package org.apache.flink.connectors.kudu.connector.writer; import org.apache.flink.annotation.PublicEvolving; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.kudu.client.AsyncKuduClient; import java.io.Serializable; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.kudu.client.SessionConfiguration.FlushMode; @@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable { private final String masters; private final FlushMode flushMode; +private final long operationTimeout; +private int maxBufferSize; +private int flushInterval; +private boolean ignoreNotFound; +private boolean ignoreDuplicate; private KuduWriterConfig( String masters, -FlushMode flushMode) { +FlushMode flushMode, +long operationTimeout, +int maxBufferSize, +int flushInterval, +boolean ignoreNotFound, +boolean ignoreDuplicate) { this.masters = checkNotNull(masters, "Kudu masters cannot be null"); this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null"); +this.operationTimeout = operationTimeout; +this.maxBufferSize = max
(flink-connector-kudu) 12/44: [BAHIR-214] Improve speed and solve eventual consistence issues (#64)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 73e87f9c2e9d3c5ef59bbe723a930b2cf5cffc1d Author: Joao Boto AuthorDate: Tue Sep 3 02:05:56 2019 +0200 [BAHIR-214] Improve speed and solve eventual consistence issues (#64) * resolve eventual consistency issues * improve speed special on eventual consistency stream * Update Readme --- flink-connector-kudu/README.md | 60 +++--- .../connectors/kudu/batch/KuduInputFormat.java | 137 + .../connectors/kudu/batch/KuduOutputFormat.java| 93 + .../connectors/kudu/connector/KuduColumnInfo.java | 4 +- .../connectors/kudu/connector/KuduFilterInfo.java | 5 +- .../connectors/kudu/connector/KuduRow.java | 4 +- .../connectors/kudu/connector/KuduTableInfo.java | 4 +- .../failure/DefaultKuduFailureHandler.java | 33 .../kudu/connector/failure/KuduFailureHandler.java | 37 .../kudu/connector/reader/KuduInputSplit.java | 39 .../kudu/connector/reader/KuduReader.java | 170 + .../kudu/connector/reader/KuduReaderConfig.java| 82 .../kudu/connector/reader/KuduReaderIterator.java | 112 +++ .../kudu/connector}/serde/DefaultSerDe.java| 4 +- .../kudu/connector}/serde/KuduDeserialization.java | 4 +- .../kudu/connector}/serde/KuduSerialization.java | 4 +- .../kudu/connector}/serde/PojoSerDe.java | 6 +- .../kudu/connector/writer/KuduWriter.java | 209 .../kudu/connector/writer/KuduWriterConfig.java| 113 +++ .../connector/writer/KuduWriterConsistency.java| 32 .../kudu/connector/writer/KuduWriterMode.java} | 18 +- .../flink/connectors/kudu/streaming/KuduSink.java | 89 + .../streaming/connectors/kudu/KuduInputFormat.java | 211 - .../connectors/kudu/KuduOutputFormat.java | 121 .../flink/streaming/connectors/kudu/KuduSink.java | 157 --- .../connectors/kudu/connector/KuduConnector.java | 170 - .../connectors/kudu/connector/KuduMapper.java | 143 -- .../connectors/kudu/connector/KuduRowIterator.java | 57 -- .../kudu/batch}/KuduInputFormatTest.java | 41 ++-- .../kudu/batch}/KuduOuputFormatTest.java | 50 +++-- .../connectors/kudu/connector/KuduDatabase.java| 48 - .../kudu/connector}/serde/PojoSerDeTest.java | 8 +- .../connectors/kudu/streaming/KuduSinkTest.java| 159 .../streaming/connectors/kudu/KuduSinkTest.java| 109 --- 34 files changed, 1467 insertions(+), 1066 deletions(-) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index 9b75aa7..8692ca5 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -29,15 +29,19 @@ env.setParallelism(PARALLELISM); // create a table info object KuduTableInfo tableInfo = KuduTableInfo.Builder .create("books") -.addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) -.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) -.addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) -.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) -.addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build()) +.addColumn(KuduColumnInfo.Builder.createString("title").build()) +.addColumn(KuduColumnInfo.Builder.createString("author").build()) +.addColumn(KuduColumnInfo.Builder.createDouble("price").build()) +.addColumn(KuduColumnInfo.Builder.createInteger("quantity").build()) .build(); - +// create a reader configuration +KuduReaderConfig readerConfig = KuduReaderConfig.Builder +.setMasters("172.25.0.6") +.setRowLimit(1000) +.build(); // Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips -env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo)) +env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe())) .count(); env.execute(); @@ -54,18 +58,23 @@ env.setParallelism(PARALLELISM); KuduTableInfo tableInfo = KuduTableInfo.Builder .create("books") .createIfNotExist(true) -.replicas(1) -.addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) -.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) -.addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) -.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()
(flink-connector-kudu) 31/44: [BAHIR-312] Add license header to README.md files
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit bef51ab2af1db28f82c4bea5864fd23202550bed Author: Joao Boto AuthorDate: Sat Jul 30 17:45:39 2022 +0200 [BAHIR-312] Add license header to README.md files --- flink-connector-kudu/README.md | 16 1 file changed, 16 insertions(+) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index c52adb1..7908e08 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -1,3 +1,19 @@ + + # Flink Kudu Connector This connector provides a source (```KuduInputFormat```), a sink/output
(flink-connector-kudu) 27/44: [BAHIR-243] Change KuduTestHarness with TestContainers
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 6832f05fd4b73ef3583e24e9968ccbd0faa4 Author: Joao Boto AuthorDate: Thu Mar 11 19:03:43 2021 +0100 [BAHIR-243] Change KuduTestHarness with TestContainers --- flink-connector-kudu/pom.xml | 4 + .../connectors/kudu/batch/KuduInputFormatTest.java | 4 +- .../kudu/batch/KuduOutputFormatTest.java | 8 +- .../connectors/kudu/connector/KuduTestBase.java| 85 +- .../connectors/kudu/streaming/KuduSinkTest.java| 11 ++- .../connectors/kudu/table/KuduCatalogTest.java | 20 ++--- .../kudu/table/KuduTableFactoryTest.java | 10 +-- .../kudu/table/KuduTableSourceITCase.java | 2 +- .../connectors/kudu/table/KuduTableSourceTest.java | 2 +- 9 files changed, 99 insertions(+), 47 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 2a2dde2..9ace382 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -157,6 +157,10 @@ org.mockito mockito-all + + org.testcontainers + testcontainers + diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java index fb7d8e3..126f7fd 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java @@ -39,7 +39,7 @@ class KuduInputFormatTest extends KuduTestBase { @Test void testInvalidTableInfo() { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress(); KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null)); } @@ -71,7 +71,7 @@ class KuduInputFormatTest extends KuduTestBase { } private List readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress(); KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(), fieldProjection == null ? null : Arrays.asList(fieldProjection)); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java index 22fa0a4..693e113 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java @@ -39,14 +39,14 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testInvalidTableInfo() { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress(); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null)); } @Test void testNotTableExist() { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduOutputFormat outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); @@ -55,7 +55,7 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testOutputWithStrongConsistency() throws Exception { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); KuduWriterConfig writerConfig = KuduWriterConfig.Builder @@ -80,7 +80,7 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testOutputWithEventualConsistency() throws Exception { -String masterAddresses = harness.getMasterAddressesAsString(); +String masterAddresses = getMasterAddress();
(flink-connector-kudu) 01/44: [FLINK-34930] Initialize tools/ and vcs.xml
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 427098db43f40fde797efbeb16723eb4e7cef233 Author: Ferenc Csaky AuthorDate: Tue Mar 26 15:21:33 2024 +0100 [FLINK-34930] Initialize tools/ and vcs.xml --- .gitmodules | 4 + .idea/vcs.xml| 25 ++ tools/ci/log4j.properties| 43 tools/maven/checkstyle.xml | 562 +++ tools/maven/suppressions.xml | 26 ++ 5 files changed, 660 insertions(+) diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000..e5d40f3 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "tools/releasing/shared"] + path = tools/releasing/shared + url = https://github.com/apache/flink-connector-shared-utils + branch = release_utils diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000..31d1734 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,25 @@ + + + + + + + + https://issues.apache.org/jira/browse/$0"; /> + + + + https://cwiki.apache.org/confluence/display/FLINK/$0"; /> + + + + https://github.com/apache/flink-connector-kudu/pull/$1"; /> + + + + + + + + + diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties new file mode 100644 index 000..7daf1c3 --- /dev/null +++ b/tools/ci/log4j.properties @@ -0,0 +1,43 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +rootLogger.level = INFO +rootLogger.appenderRef.out.ref = ConsoleAppender + +# - +# Console (use 'console') +# - + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n + +# - +# File (use 'file') +# - +appender.file.name = FileAppender +appender.file.type = FILE +appender.file.fileName = ${sys:log.dir}/mvn-${sys:mvn.forkNumber:-output}.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n +appender.file.createOnDemand = true + +# suppress the irrelevant (wrong) warnings from the netty channel handler +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = ERROR diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml new file mode 100644 index 000..2048fd1 --- /dev/null +++ b/tools/maven/checkstyle.xml @@ -0,0 +1,562 @@ + + +http://www.puppycrawl.com/dtds/configuration_1_3.dtd";> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
(flink-connector-kudu) 10/44: [BAHIR-209] upgrade kudu version to 1.10.0 (#61)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 02deb5127a163c32e75d2c3ed23614c0934f109e Author: Joao Boto AuthorDate: Thu Jul 4 00:11:57 2019 +0200 [BAHIR-209] upgrade kudu version to 1.10.0 (#61) --- flink-connector-kudu/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index a504b89..bd42c55 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ jar -1.9.0 +1.10.0 1.10.19 !DockerTest
(flink-connector-kudu) branch main updated (d837674 -> 8674446)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git from d837674 Initial commit new 427098d [FLINK-34930] Initialize tools/ and vcs.xml new 663ffd1 [FLINK-34930] Remove Python CI new 37a3e55 [BAHIR-99] kudu connector new 701fabe [BAHIR-179] Fail Docker integration tests silently new f0cd1d9 [BAHIR-194] bump kudu version to 1.8.0 new 247dfe9 [BAHIR-180] Improve eventual consistence for Kudu connector new 1e26505 [BAHIR-199] Bump kudu version to 1.9.0 new f9c75bb [BAHIR-202] Improve KuduSink throughput using async FlushMode new a1b4da5 [BAHIR-200] Move tests from docker to kudu-test-utils (#49) new 02deb51 [BAHIR-209] upgrade kudu version to 1.10.0 (#61) new 47133a3 [BAHIR-207] Add tests for scala 2.12 on travis (#59) new 73e87f9 [BAHIR-214] Improve speed and solve eventual consistence issues (#64) new c380e40 Add support "upsert part of columns of a kudu table" (#70) new 9b99c18 Fix "the client has already been closed" bug (#75) new 6da6ad7 Fix NotSerializableException in Kudu connector (#74) new 3c57f2e Kudu Connector rework (#78) new 6e8b66f Add batch table env support and filter push down to Kudu connector (#82) new 3f9db63 BAHIR-240: replace docker test by testcontainer (#89) new 7b5d717 [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99) new 4a5ec77 [BAHIR-263] Update flink to 1.12.2 (#115) new 6500475 [BAHIR-260] Add kudu table writer config (#109) new 03f0312 [BAHIR-293] Fix documentation tables new b9cd23e [BAHIR-291] Bump flink to 1.14.0 (#136) new 7700edb [BAHIR-296] Unify log4j libs new 1dce829 BAHIR-296: Unify mockito version to 1.10.19 new e9c2a76 [BAHIR-302] Add enforcers new 6832f05 [BAHIR-243] Change KuduTestHarness with TestContainers new 285a6e1 [BAHIR-302] Group declaration of flink dependencies on parent pom new af8dff4 [maven-release-plugin] prepare for next development iteration new aca6e7b [BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction new bef51ab [BAHIR-312] Add license header to README.md files new 2e75a60 [BAHIR-321] Make KuduFilterInfo handle String literals new 5a1da31 [BAHIR-308] Remove support for scala 2.11 new 6ff53e7 [BAHIR-308] Bump flink version to 1.15.3 new 78c76c5 [BAHIR-308] Remove scala prefix where we can new d8c803a [BAHIR-324] Closing KuduReader at JobManager new 0335544 [FLINK-34930] Adapt POM files to the new structure, remove flink-shaded Guava usage new abf6eba [FLINK-34930] Apply spotless code format and checkstyle rules new 34cb67f [FLINK-34930] Rename base package new abc4a3b [FLINK-34930] Fix KuduTableSourceITCase new 9253b9d [FLINK-34930] State Bahir fork new c7dfb67 [FLINK-34930] Skip spotless for JDK 21+ new 5230ec1 [FLINK-34930] Enable module opens for tests for newer JDKs new 8674446 [FLINK-34930] Migrate Bahir NOTICE The 44 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/push_pr.yml | 7 - .gitmodules| 4 + .idea/vcs.xml | 25 + NOTICE | 6 +- README.md | 6 +- flink-connector-kudu/README.md | 324 flink-connector-kudu/pom.xml | 97 .../kudu/connector/ColumnSchemasFactory.java | 43 ++ .../kudu/connector/CreateTableOptionsFactory.java | 42 ++ .../connector/kudu/connector/KuduFilterInfo.java | 199 .../connector/kudu/connector/KuduTableInfo.java| 127 + .../connector/converter/RowResultConverter.java| 41 ++ .../connector/converter/RowResultRowConverter.java | 46 ++ .../converter/RowResultRowDataConverter.java | 104 .../failure/DefaultKuduFailureHandler.java | 38 ++ .../kudu/connector/failure/KuduFailureHandler.java | 54 ++ .../kudu/connector/reader/KuduInputSplit.java | 46 ++ .../kudu/connector/reader/KuduReader.java | 198 .../kudu/connector/reader/KuduReaderConfig.java| 89 .../kudu/connector/reader/KuduReaderIterator.java | 68 +++ .../writer/AbstractSingleOperationMapper.java | 108 .../kudu/connector/writer/KuduOperationMapper.java | 46 ++ .../kudu/connector/writer/KuduWriter.java | 168 ++ .../kudu/connector/writer/KuduWriterConfig.java| 206 .../kudu/connector/writer/PojoOperationMapper.java | 86 .../writer/RowDa
(flink-connector-kudu) 29/44: [maven-release-plugin] prepare for next development iteration
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit af8dff48d082bf8f340e181a8c5719993de47eef Author: Joao Boto AuthorDate: Thu Jun 16 15:44:53 2022 +0200 [maven-release-plugin] prepare for next development iteration --- flink-connector-kudu/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 6f68522..fd485d5 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -21,7 +21,7 @@ org.apache.bahir bahir-flink-parent_2.11 -1.1-SNAPSHOT +1.2-SNAPSHOT ../pom.xml
(flink-connector-kudu) 09/44: [BAHIR-200] Move tests from docker to kudu-test-utils (#49)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit a1b4da53f4a73eaa22e5dbb4d22e0ed897ec4f0a Author: Joao Boto AuthorDate: Sun May 26 02:45:25 2019 +0200 [BAHIR-200] Move tests from docker to kudu-test-utils (#49) --- flink-connector-kudu/dockers/docker-compose.yml| 92 -- flink-connector-kudu/dockers/role/Dockerfile | 41 -- .../dockers/role/docker-entrypoint.sh | 69 flink-connector-kudu/dockers/run_kudu_tests.sh | 68 flink-connector-kudu/dockers/start-images.sh | 42 -- flink-connector-kudu/dockers/stop-images.sh| 33 flink-connector-kudu/pom.xml | 42 +++--- .../connectors/kudu/connector/KuduFilterInfo.java | 4 +- .../streaming/connectors/kudu/DockerTest.java | 31 .../connectors/kudu/KuduInputFormatTest.java | 8 +- .../connectors/kudu/KuduOuputFormatTest.java | 13 +-- .../streaming/connectors/kudu/KuduSinkTest.java| 29 +-- .../connectors/kudu/connector/KuduDatabase.java| 16 +++- 13 files changed, 82 insertions(+), 406 deletions(-) diff --git a/flink-connector-kudu/dockers/docker-compose.yml b/flink-connector-kudu/dockers/docker-compose.yml deleted file mode 100644 index d2c95bb..000 --- a/flink-connector-kudu/dockers/docker-compose.yml +++ /dev/null @@ -1,92 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -version: '2' - -services: - - kudu-master: -image: eskabetxe/kudu -container_name: kudu-master -hostname: 172.25.0.6 -ports: - - "8051:8051" -volumes: - - /var/lib/kudu/master -command: master -networks: - mynet: -ipv4_address: 172.25.0.6 - - kudu-server1: -image: eskabetxe/kudu -container_name: kudu-server1 -hostname: 172.25.0.7 -environment: - - KUDU_MASTER=172.25.0.6 -ports: - - "8054:8050" -volumes: - - /var/lib/kudu/server -command: tserver -networks: - mynet: -ipv4_address: 172.25.0.7 -links: - - kudu-master - - kudu-server2: -image: eskabetxe/kudu -container_name: kudu-server2 -hostname: 172.25.0.8 -environment: - - KUDU_MASTER=172.25.0.6 -ports: - - "8052:8050" -volumes: - - /var/lib/kudu/server -command: tserver -networks: - mynet: -ipv4_address: 172.25.0.8 -links: - - kudu-master - - kudu-server3: -image: eskabetxe/kudu -container_name: kudu-server3 -hostname: 172.25.0.9 -environment: - - KUDU_MASTER=172.25.0.6 -ports: - - "8053:8050" -volumes: - - /var/lib/kudu/server -command: tserver -networks: - mynet: -ipv4_address: 172.25.0.9 -links: - - kudu-master - -networks: - mynet: -driver: bridge -ipam: - config: -- subnet: 172.25.0.0/24 - IPRange: 172.25.0.2/24, - gateway: 172.25.0.1 diff --git a/flink-connector-kudu/dockers/role/Dockerfile b/flink-connector-kudu/dockers/role/Dockerfile deleted file mode 100644 index b14b087..000 --- a/flink-connector-kudu/dockers/role/Dockerfile +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -FROM bitnami/minideb:jessie -MAINTAINER eskabetxe - -RUN set -x \ -&& apt-get update \ -&& apt-get
(flink-connector-kudu) 08/44: [BAHIR-202] Improve KuduSink throughput using async FlushMode
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit f9c75bbc99f190fd562215ee1dfef53a36241cda Author: SuXingLee <913910...@qq.com> AuthorDate: Fri Mar 22 20:08:33 2019 +0800 [BAHIR-202] Improve KuduSink throughput using async FlushMode By default, KuduSink processing message one by one without checkpoint. When checkoint is enabled, throughput is improved by using FlushMode.AUTO_FLUSH_BACKGROUND, and use checkpoint to ensure at-least-once. Closes #50 --- .../connectors/kudu/KuduOutputFormat.java | 5 +- .../flink/streaming/connectors/kudu/KuduSink.java | 59 -- .../connectors/kudu/connector/KuduConnector.java | 18 +-- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java index 9d12710..c1301da 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import java.io.IOException; public class KuduOutputFormat extends RichOutputFormat { +private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); private String kuduMasters; @@ -87,7 +90,7 @@ public class KuduOutputFormat extends RichOutputFormat { @Override public void open(int taskNumber, int numTasks) throws IOException { if (connector != null) return; -connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); +connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC); serializer = serializer.withSchema(tableInfo.getSchema()); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java index 53cf249..b6dd9c8 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java @@ -17,18 +17,25 @@ package org.apache.flink.streaming.connectors.kudu; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -public class KuduSink extends RichSinkFunction { +public class KuduSink extends RichSinkFunction implements CheckpointedFunction { + +private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); @@ -36,6 +43,7 @@ public class KuduSink extends RichSinkFunction { private KuduTableInfo tableInfo; private KuduConnector.Consistency consistency; private KuduConnector.WriteMode writeMode; +private FlushMode flushMode; private KuduSerialization serializer; @@ -77,11 +85,42 @@ public class KuduSink extends RichSinkFunction { return this; } +public KuduSink withSyncFlushMode() { +this.flushMode = FlushMode.AUTO_FLUSH_SYNC; +return this; +} + +public KuduSink withAsyncFlushMode() { +this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; +return this; +} + @Override public void open(Configuration parameters) throws IOException { -if (connector != null) return;
(flink-connector-kudu) 23/44: [BAHIR-291] Bump flink to 1.14.0 (#136)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit b9cd23e729e0d874b7fd53e86a7890f73b939567 Author: Roc Marshal AuthorDate: Mon Dec 27 01:27:08 2021 +0800 [BAHIR-291] Bump flink to 1.14.0 (#136) --- flink-connector-kudu/pom.xml | 2 +- .../java/org/apache/flink/connectors/kudu/table/KuduCatalog.java | 4 ++-- .../org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java | 7 ++- .../org/apache/flink/connectors/kudu/table/KuduTableFactory.java | 2 +- .../org/apache/flink/connectors/kudu/table/KuduTableSource.java| 2 +- .../apache/flink/connectors/kudu/table/utils/KuduTableUtils.java | 2 +- 6 files changed, 8 insertions(+), 11 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index a76102e..ac6cdc5 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -51,7 +51,7 @@ org.apache.flink - flink-table-planner-blink_${scala.binary.version} + flink-table-planner_${scala.binary.version} ${flink.version} provided diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java index 2ca7c0e..d8343e8 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java @@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.kudu.ColumnSchema; import org.apache.kudu.client.AlterTableOptions; @@ -237,7 +237,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog { @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException { -Map tableProperties = table.getProperties(); +Map tableProperties = table.getOptions(); TableSchema tableSchema = table.getSchema(); Set optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS)); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java index 30aaa40..2458a56 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java @@ -31,9 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; - /** * Factory for {@link KuduCatalog}. */ @@ -45,8 +42,8 @@ public class KuduCatalogFactory implements CatalogFactory { @Override public Map requiredContext() { Map context = new HashMap<>(); -context.put(CATALOG_TYPE, KuduTableFactory.KUDU); -context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility +context.put("type", KuduTableFactory.KUDU); +context.put("property-version", "1"); // backwards compatibility return context; } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java index 524f521..a2883af 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java @@ -132,7 +132,7 @@ public class KuduTableFactory implements TableSourceFactory, TableSinkFacto public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) { validateTable(table); String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName()); -return createTableSource(tableName, table.getSchema(), table.getProperties()); +return createTableSource(tableName, table.getSchema(), table.getOptions()); } private KuduTableSource createTableSource(String tableName, TableSchema schema, Map props) { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main
(flink-connector-kudu) 24/44: [BAHIR-296] Unify log4j libs
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 7700edb6a18d08cf2f93f84d4377807af1c826bd Author: Joao Boto AuthorDate: Sun Dec 26 19:39:56 2021 +0100 [BAHIR-296] Unify log4j libs --- flink-connector-kudu/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index ac6cdc5..12fd9da 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -104,19 +104,19 @@ org.apache.logging.log4j log4j-api - ${log4j2.version} + ${log4j.version} test org.apache.logging.log4j log4j-core - ${log4j2.version} + ${log4j.version} test org.apache.logging.log4j log4j-slf4j-impl - ${log4j2.version} + ${log4j.version} test
(flink-connector-kudu) 14/44: Fix "the client has already been closed" bug (#75)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 9b99c1862cad3c26b0d287c1fb13e15b588b1392 Author: WangJ1an <826204...@qq.com> AuthorDate: Mon Jan 20 04:20:34 2020 +0800 Fix "the client has already been closed" bug (#75) Fix the problem "Caused by: java.lang.IllegalStateException: Cannot proceed, the client has already been closed" when running in Flink local mode --- .../java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java index 3a35e6a..98877d8 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java @@ -98,6 +98,7 @@ public class KuduInputFormat extends RichInputFormat { } if (kuduReader != null) { kuduReader.close(); +kuduReader = null; } }
(flink-connector-kudu) 15/44: Fix NotSerializableException in Kudu connector (#74)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 6da6ad7ad427b23392d3bcbbd43391c05846ad5e Author: Darcy <331046...@qq.com> AuthorDate: Mon Jan 20 04:22:46 2020 +0800 Fix NotSerializableException in Kudu connector (#74) --- .../org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index c37bc9a..c7ae4a4 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -21,10 +21,11 @@ import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduPredicate; +import java.io.Serializable; import java.util.List; @PublicEvolving -public class KuduFilterInfo { +public class KuduFilterInfo implements Serializable { private String column; private FilterType type;
(flink-connector-kudu) 04/44: [BAHIR-179] Fail Docker integration tests silently
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 701fabeeb897f13cfcde2e55c8a035c1589e74fc Author: eskabetxe AuthorDate: Wed Jan 9 17:29:32 2019 +0100 [BAHIR-179] Fail Docker integration tests silently When running docker based integration tests locally, fail silentily if env requirements not available. Closes #38 Closes #35 --- flink-connector-kudu/pom.xml | 42 -- .../streaming/connectors/kudu/DockerTest.java | 31 .../connectors/kudu/KuduInputFormatTest.java | 4 +-- .../connectors/kudu/KuduOuputFormatTest.java | 3 +- .../streaming/connectors/kudu/KuduSinkTest.java| 2 +- .../src/test/resources/log4j.properties| 27 ++ 6 files changed, 67 insertions(+), 42 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 348371b..61ab4a6 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -31,7 +31,8 @@ 1.7.1 -5.2.0 + +!DockerTest @@ -58,45 +59,14 @@ test - - org.junit.jupiter - junit-jupiter-api - ${junit.version} - test - - - default - -true - - - - -org.apache.maven.plugins -maven-surefire-plugin - - -**/*Test.java - - - - - - - - test-kudu - - - -org.apache.maven.plugins -maven-surefire-plugin - - - + docker-test + +DockerTest + diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java new file mode 100644 index 000..070e634 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.junit.jupiter.api.Tag; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +@Tag("DockerTest") +public @interface DockerTest { +} + diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java index 8cfc102..eb9dc00 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java @@ -26,11 +26,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +@DockerTest public class KuduInputFormatTest extends KuduDatabase { - - - @Test public void testInvalidKuduMaster() throws IOException { KuduTableInfo tableInfo = booksTableInfo("books",false); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java index 6eb5ebe..e282185 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java @@ -26,10 +26,9 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +@DockerTest public class KuduOuputFormatTest extends KuduDatabase { - - @Test public void testInvalidKuduMaster() throws IOException { Ku
(flink-connector-kudu) 22/44: [BAHIR-293] Fix documentation tables
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 03f031288d7f97338e55a8534e59bc473bcfdea1 Author: Joao Boto AuthorDate: Tue Dec 7 11:09:26 2021 +0100 [BAHIR-293] Fix documentation tables --- flink-connector-kudu/README.md | 25 + 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index 6370aa6..a0e0234 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -154,18 +154,19 @@ The example uses lambda expressions to implement the functional interfaces. Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html). ### Supported data types -| Flink/SQL | Kudu | -| - |:-:| -|STRING | STRING| -| BOOLEAN |BOOL | -| TINYINT | INT8| -| SMALLINT | INT16| -| INT | INT32| -| BIGINT| INT64 | -| FLOAT | FLOAT | -| DOUBLE|DOUBLE| -| BYTES|BINARY| -| TIMESTAMP(3) |UNIXTIME_MICROS | + +| Flink/SQL| Kudu| +|--|:---:| +| `STRING` | STRING | +| `BOOLEAN`| BOOL| +| `TINYINT`| INT8| +| `SMALLINT` | INT16 | +| `INT`| INT32 | +| `BIGINT` | INT64 | +| `FLOAT` | FLOAT | +| `DOUBLE` | DOUBLE | +| `BYTES` | BINARY | +| `TIMESTAMP(3)` | UNIXTIME_MICROS | Note: * `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
(flink-connector-kudu) 02/44: [FLINK-34930] Remove Python CI
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 663ffd1a45b9e675058d708ef4f45546736a985f Author: Ferenc Csaky AuthorDate: Thu Apr 11 20:53:49 2024 +0200 [FLINK-34930] Remove Python CI --- .github/workflows/push_pr.yml | 7 --- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 20a..72b98a2 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -36,10 +36,3 @@ jobs: with: flink_version: ${{ matrix.flink }} jdk_version: ${{ matrix.jdk }} - python_test: -strategy: - matrix: -flink: [ 1.17.2, 1.18.1, 1.19-SNAPSHOT ] -uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils -with: - flink_version: ${{ matrix.flink }} \ No newline at end of file
(flink-connector-kudu) 07/44: [BAHIR-199] Bump kudu version to 1.9.0
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 1e26505253818da3a5176281f7c5264978e1c71e Author: Joao Boto AuthorDate: Thu Mar 7 21:51:06 2019 +0100 [BAHIR-199] Bump kudu version to 1.9.0 Closes #48 --- flink-connector-kudu/README.md | 2 +- flink-connector-kudu/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index af2985b..9b75aa7 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -9,7 +9,7 @@ following dependency to your project: 1.1-SNAPSHOT -*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version). +*Version Compatibility*: This module is compatible with Apache Kudu *1.9.0* (last stable version). Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html). diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 61ab4a6..d51341b 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ jar -1.7.1 +1.9.0 !DockerTest
(flink-connector-kudu) 13/44: Add support "upsert part of columns of a kudu table" (#70)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit c380e400d4c81c83dabf7ff42f4a5bb193492704 Author: Darcy <331046...@qq.com> AuthorDate: Sat Nov 30 04:02:56 2019 +0800 Add support "upsert part of columns of a kudu table" (#70) Sometimes we don't want to upsert all columns of a kudu table. So we need to support the function that upsert part of columns of a kudu table. --- .../flink/connectors/kudu/connector/KuduRow.java | 4 .../kudu/connector/writer/KuduWriter.java | 5 - .../connectors/kudu/batch/KuduOuputFormatTest.java | 2 ++ .../connectors/kudu/connector/KuduDatabase.java| 26 -- .../connectors/kudu/streaming/KuduSinkTest.java| 3 ++- 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java index af78361..78e6e6e 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java @@ -41,6 +41,10 @@ public class KuduRow extends Row { return super.getField(rowNames.get(name)); } +public boolean hasField(String name) { +return rowNames.get(name) != null; +} + public void setField(int pos, String name, Object value) { super.setField(pos, value); this.rowNames.put(name, pos); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index f4e2a8a..57c0741 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -152,7 +152,10 @@ public class KuduWriter implements AutoCloseable { table.getSchema().getColumns().forEach(column -> { String columnName = column.getName(); -Object value = row.getField(column.getName()); +if (!row.hasField(columnName)) { +return; +} +Object value = row.getField(columnName); if (value == null) { partialRow.setNull(columnName); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java index 963a8c0..f14eaa0 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java @@ -71,6 +71,7 @@ class KuduOuputFormatTest extends KuduDatabase { List rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); +kuduRowsTest(rows); cleanDatabase(tableInfo); } @@ -99,6 +100,7 @@ class KuduOuputFormatTest extends KuduDatabase { List rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); +kuduRowsTest(rows); cleanDatabase(tableInfo); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java index 3d02a1d..cda8c21 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java @@ -56,14 +56,16 @@ public class KuduDatabase { .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) -.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) -.addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) +.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build()) +.addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build()) .build(); } protected static List booksDataRow() { return Arrays.stream(booksTableData) .map(row -> { +Integer rowId = (Integer)row[0]; +if (rowId % 2 == 1) {
(flink-connector-kudu) 03/44: [BAHIR-99] kudu connector
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git commit 37a3e55b5253027c5f475ef5e3c7c8faa42ffebe Author: Joao Boto AuthorDate: Wed Jul 25 20:17:36 2018 +0200 [BAHIR-99] kudu connector --- flink-connector-kudu/README.md | 98 + flink-connector-kudu/dockers/docker-compose.yml| 92 + flink-connector-kudu/dockers/role/Dockerfile | 41 .../dockers/role/docker-entrypoint.sh | 69 +++ flink-connector-kudu/dockers/run_kudu_tests.sh | 68 +++ flink-connector-kudu/dockers/start-images.sh | 42 flink-connector-kudu/dockers/stop-images.sh| 33 flink-connector-kudu/pom.xml | 103 ++ .../streaming/connectors/kudu/KuduInputFormat.java | 218 + .../connectors/kudu/KuduOutputFormat.java | 110 +++ .../flink/streaming/connectors/kudu/KuduSink.java | 106 ++ .../connectors/kudu/connector/KuduColumnInfo.java | 161 +++ .../connectors/kudu/connector/KuduConnector.java | 133 + .../connectors/kudu/connector/KuduFilterInfo.java | 173 .../connectors/kudu/connector/KuduMapper.java | 146 ++ .../connectors/kudu/connector/KuduRow.java | 137 + .../connectors/kudu/connector/KuduTableInfo.java | 133 + .../connectors/kudu/KuduInputFormatTest.java | 91 + .../connectors/kudu/KuduOuputFormatTest.java | 93 + .../streaming/connectors/kudu/KuduSinkTest.java| 89 + .../connectors/kudu/connector/KuduDatabase.java| 89 + 21 files changed, 2225 insertions(+) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md new file mode 100644 index 000..af2985b --- /dev/null +++ b/flink-connector-kudu/README.md @@ -0,0 +1,98 @@ +# Flink Kudu Connector + +This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the +following dependency to your project: + + + org.apache.bahir + flink-connector-kudu_2.11 + 1.1-SNAPSHOT + + +*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version). + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html). + +## Installing Kudu + +Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html). +Optionally, you can use the docker images provided in dockers folder. + +## KuduInputFormat + +``` +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Builder +.create("books") +.addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) +.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) +.addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) +.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) +.addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) +.build(); + +// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips +env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo)) +.count(); + +env.execute(); +``` + +## KuduOutputFormat + +``` +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Builder +.create("books") +.createIfNotExist(true) +.replicas(1) +.addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) +.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) +.addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) +.addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) +.addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) +.build(); + +... + +env.fromCollection(books) +.output(new KuduOutputFormat<>("172.25.0.6", tableInfo)); + +env.execute(); +``` + +## KuduSink + +``` +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Bui