This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 65004752884890ee49dbffd5bfb52a2a863758ed Author: hackergin <jinfeng1...@gmail.com> AuthorDate: Wed Apr 7 06:35:07 2021 -0500 [BAHIR-260] Add kudu table writer config (#109) --- .../connectors/kudu/connector/KuduTableInfo.java | 18 ++++ .../kudu/connector/writer/KuduWriter.java | 5 + .../kudu/connector/writer/KuduWriterConfig.java | 113 ++++++++++++++++++++- .../connectors/kudu/table/KuduTableFactory.java | 49 ++++++++- .../flink/connectors/kudu/table/KuduTableSink.java | 21 ++++ .../kudu/table/KuduTableFactoryTest.java | 44 +++++++- 6 files changed, 243 insertions(+), 7 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java index 83c7dde..baae8a0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java @@ -23,6 +23,7 @@ import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; import java.io.Serializable; +import java.util.Objects; /** * Describes which table should be used in sources and sinks along with specifications @@ -103,4 +104,21 @@ public class KuduTableInfo implements Serializable { } return createTableOptionsFactory.getCreateTableOptions(); } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KuduTableInfo that = (KuduTableInfo) o; + return Objects.equals(this.name, that.name); + } } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index 03c37ea..59ad196 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -72,6 +72,11 @@ public class KuduWriter<T> implements AutoCloseable { private KuduSession obtainSession() { KuduSession session = client.newSession(); session.setFlushMode(writerConfig.getFlushMode()); + session.setTimeoutMillis(writerConfig.getOperationTimeout()); + session.setMutationBufferSpace(writerConfig.getMaxBufferSize()); + session.setFlushInterval(writerConfig.getFlushInterval()); + session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate()); + session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound()); return session; } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java index 598f8d0..ff93921 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java @@ -19,8 +19,10 @@ package org.apache.flink.connectors.kudu.connector.writer; import org.apache.flink.annotation.PublicEvolving; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.kudu.client.AsyncKuduClient; import java.io.Serializable; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.kudu.client.SessionConfiguration.FlushMode; @@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable { private final String masters; private final FlushMode flushMode; + private final long operationTimeout; + private int maxBufferSize; + private int flushInterval; + private boolean ignoreNotFound; + private boolean ignoreDuplicate; private KuduWriterConfig( String masters, - FlushMode flushMode) { + FlushMode flushMode, + long operationTimeout, + int maxBufferSize, + int flushInterval, + boolean ignoreNotFound, + boolean ignoreDuplicate) { this.masters = checkNotNull(masters, "Kudu masters cannot be null"); this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null"); + this.operationTimeout = operationTimeout; + this.maxBufferSize = maxBufferSize; + this.flushInterval = flushInterval; + this.ignoreNotFound = ignoreNotFound; + this.ignoreDuplicate = ignoreDuplicate; } public String getMasters() { @@ -51,6 +68,26 @@ public class KuduWriterConfig implements Serializable { return flushMode; } + public long getOperationTimeout() { + return operationTimeout; + } + + public int getMaxBufferSize() { + return maxBufferSize; + } + + public int getFlushInterval() { + return flushInterval; + } + + public boolean isIgnoreNotFound() { + return ignoreNotFound; + } + + public boolean isIgnoreDuplicate() { + return ignoreDuplicate; + } + @Override public String toString() { return new ToStringBuilder(this) @@ -65,6 +102,16 @@ public class KuduWriterConfig implements Serializable { public static class Builder { private String masters; private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; + // Reference from AsyncKuduClientBuilder defaultOperationTimeoutMs. + private long timeout = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS; + // Reference from AsyncKuduSession mutationBufferMaxOps 1000. + private int maxBufferSize = 1000; + // Reference from AsyncKuduSession flushIntervalMillis 1000. + private int flushInterval = 1000; + // Reference from AsyncKuduSession ignoreAllNotFoundRows false. + private boolean ignoreNotFound = false; + // Reference from AsyncKuduSession ignoreAllDuplicateRows false. + private boolean ignoreDuplicate = false; private Builder(String masters) { this.masters = masters; @@ -87,10 +134,72 @@ public class KuduWriterConfig implements Serializable { return setConsistency(FlushMode.AUTO_FLUSH_SYNC); } + public Builder setMaxBufferSize(int maxBufferSize) { + this.maxBufferSize = maxBufferSize; + return this; + } + + public Builder setFlushInterval(int flushInterval) { + this.flushInterval = flushInterval; + return this; + } + + public Builder setOperationTimeout(long timeout) { + this.timeout = timeout; + return this; + } + + public Builder setIgnoreNotFound(boolean ignoreNotFound) { + this.ignoreNotFound = ignoreNotFound; + return this; + } + + public Builder setIgnoreDuplicate(boolean ignoreDuplicate) { + this.ignoreDuplicate = ignoreDuplicate; + return this; + } + public KuduWriterConfig build() { return new KuduWriterConfig( masters, - flushMode); + flushMode, + timeout, + maxBufferSize, + flushInterval, + ignoreNotFound, + ignoreDuplicate); + } + + @Override + public int hashCode() { + int result = + Objects.hash( + masters, + flushMode, + timeout, + maxBufferSize, + flushInterval, + ignoreNotFound, + ignoreDuplicate); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Builder that = (Builder) o; + return Objects.equals(masters, that.masters) + && Objects.equals(flushMode, that.flushMode) + && Objects.equals(timeout, that.timeout) + && Objects.equals(maxBufferSize, that.maxBufferSize) + && Objects.equals(flushInterval, that.flushInterval) + && Objects.equals(ignoreNotFound, that.ignoreNotFound) + && Objects.equals(ignoreDuplicate, that.ignoreDuplicate); } } } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java index 1961aad..524f521 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java @@ -35,11 +35,28 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.*; -import static org.apache.flink.table.descriptors.Rowtime.*; -import static org.apache.flink.table.descriptors.Schema.*; +import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; import static org.apache.flink.util.Preconditions.checkNotNull; public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> { @@ -49,6 +66,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto public static final String KUDU_HASH_COLS = "kudu.hash-columns"; public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns"; public static final String KUDU_REPLICAS = "kudu.replicas"; + public static final String KUDU_MAX_BUFFER_SIZE = "kudu.max-buffer-size"; + public static final String KUDU_FLUSH_INTERVAL = "kudu.flush-interval"; + public static final String KUDU_OPERATION_TIMEOUT = "kudu.operation-timeout"; + public static final String KUDU_IGNORE_NOT_FOUND = "kudu.ignore-not-found"; + public static final String KUDU_IGNORE_DUPLICATE = "kudu.ignore-duplicate"; public static final String KUDU = "kudu"; @Override @@ -65,6 +87,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto properties.add(KUDU_MASTERS); properties.add(KUDU_HASH_COLS); properties.add(KUDU_PRIMARY_KEY_COLS); + properties.add(KUDU_MAX_BUFFER_SIZE); + properties.add(KUDU_FLUSH_INTERVAL); + properties.add(KUDU_OPERATION_TIMEOUT); + properties.add(KUDU_IGNORE_NOT_FOUND); + properties.add(KUDU_IGNORE_DUPLICATE); // schema properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE); properties.add(SCHEMA + ".#." + SCHEMA_TYPE); @@ -123,10 +150,12 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) { validateTable(table); String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName()); - return createTableSink(tableName, table.getSchema(), table.getProperties()); + return createTableSink(tableName, table.getSchema(), table.toProperties()); } private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) { + DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(props); String masterAddresses = props.get(KUDU_MASTERS); TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema); KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props); @@ -134,6 +163,18 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder .setMasters(masterAddresses); + Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT); + Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL); + Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE); + Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND); + Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE); + + operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time)); + flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval)); + bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size)); + ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i)); + ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i)); + return new KuduTableSink(configBuilder, tableInfo, physicalSchema); } } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java index 99325c0..5ada84e 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java @@ -30,6 +30,8 @@ import org.apache.flink.table.sinks.UpsertStreamTableSink; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.types.Row; +import java.util.Objects; + public class KuduTableSink implements UpsertStreamTableSink<Row> { private final KuduWriterConfig.Builder writerConfigBuilder; @@ -68,4 +70,23 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> { @Override public TableSchema getTableSchema() { return flinkSchema; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != this.getClass()) { + return false; + } + KuduTableSink that = (KuduTableSink) o; + return this.writerConfigBuilder.equals(that.writerConfigBuilder) && + this.flinkSchema.equals(that.flinkSchema) && + this.tableInfo.equals(that.tableInfo); + } + + @Override + public int hashCode() { + return Objects.hash(writerConfigBuilder, flinkSchema, tableInfo); + } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java index d4de7f6..a8ec686 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java @@ -16,12 +16,21 @@ */ package org.apache.flink.connectors.kudu.table; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.connector.KuduTestBase; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.sinks.TableSink; import org.apache.kudu.Type; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduTable; @@ -32,12 +41,17 @@ import org.junit.jupiter.api.Test; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class KuduTableFactoryTest extends KuduTestBase { private StreamTableEnvironment tableEnv; @@ -153,4 +167,32 @@ public class KuduTableFactoryTest extends KuduTestBase { assertEquals("f2", rows.get(1).getString("first")); assertEquals("s2", rows.get(1).getString("second")); } + + @Test + public void testTableSink() { + final TableSchema schema = TableSchema.builder() + .field("first", DataTypes.STRING()) + .field("second", DataTypes.STRING()) + .build(); + final Map<String, String> properties = new HashMap<>(); + properties.put("connector.type", "kudu"); + properties.put("kudu.masters", kuduMasters); + properties.put("kudu.table", "TestTable12"); + properties.put("kudu.ignore-not-found", "true"); + properties.put("kudu.ignore-duplicate", "true"); + properties.put("kudu.flush-interval", "10000"); + properties.put("kudu.max-buffer-size", "10000"); + + KuduWriterConfig.Builder builder = KuduWriterConfig.Builder.setMasters(kuduMasters) + .setFlushInterval(10000) + .setMaxBufferSize(10000) + .setIgnoreDuplicate(true) + .setIgnoreNotFound(true); + KuduTableInfo kuduTableInfo = KuduTableInfo.forTable("TestTable12"); + KuduTableSink expected = new KuduTableSink(builder, kuduTableInfo, schema); + final TableSink<?> actualSink = TableFactoryService.find(TableSinkFactory.class, properties) + .createTableSink(ObjectPath.fromString("default.TestTable12"), new CatalogTableImpl(schema, properties, null)); + + assertEquals(expected, actualSink); + } }