This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 73e87f9c2e9d3c5ef59bbe723a930b2cf5cffc1d Author: Joao Boto <b...@boto.pro> AuthorDate: Tue Sep 3 02:05:56 2019 +0200 [BAHIR-214] Improve speed and solve eventual consistence issues (#64) * resolve eventual consistency issues * improve speed special on eventual consistency stream * Update Readme --- flink-connector-kudu/README.md | 60 +++--- .../connectors/kudu/batch/KuduInputFormat.java | 137 +++++++++++++ .../connectors/kudu/batch/KuduOutputFormat.java | 93 +++++++++ .../connectors/kudu/connector/KuduColumnInfo.java | 4 +- .../connectors/kudu/connector/KuduFilterInfo.java | 5 +- .../connectors/kudu/connector/KuduRow.java | 4 +- .../connectors/kudu/connector/KuduTableInfo.java | 4 +- .../failure/DefaultKuduFailureHandler.java | 33 ++++ .../kudu/connector/failure/KuduFailureHandler.java | 37 ++++ .../kudu/connector/reader/KuduInputSplit.java | 39 ++++ .../kudu/connector/reader/KuduReader.java | 170 +++++++++++++++++ .../kudu/connector/reader/KuduReaderConfig.java | 82 ++++++++ .../kudu/connector/reader/KuduReaderIterator.java | 112 +++++++++++ .../kudu/connector}/serde/DefaultSerDe.java | 4 +- .../kudu/connector}/serde/KuduDeserialization.java | 4 +- .../kudu/connector}/serde/KuduSerialization.java | 4 +- .../kudu/connector}/serde/PojoSerDe.java | 6 +- .../kudu/connector/writer/KuduWriter.java | 209 ++++++++++++++++++++ .../kudu/connector/writer/KuduWriterConfig.java | 113 +++++++++++ .../connector/writer/KuduWriterConsistency.java | 32 ++++ .../kudu/connector/writer/KuduWriterMode.java} | 18 +- .../flink/connectors/kudu/streaming/KuduSink.java | 89 +++++++++ .../streaming/connectors/kudu/KuduInputFormat.java | 211 --------------------- .../connectors/kudu/KuduOutputFormat.java | 121 ------------ .../flink/streaming/connectors/kudu/KuduSink.java | 157 --------------- .../connectors/kudu/connector/KuduConnector.java | 170 ----------------- .../connectors/kudu/connector/KuduMapper.java | 143 -------------- .../connectors/kudu/connector/KuduRowIterator.java | 57 ------ .../kudu/batch}/KuduInputFormatTest.java | 41 ++-- .../kudu/batch}/KuduOuputFormatTest.java | 50 +++-- .../connectors/kudu/connector/KuduDatabase.java | 48 ++++- .../kudu/connector}/serde/PojoSerDeTest.java | 8 +- .../connectors/kudu/streaming/KuduSinkTest.java | 159 ++++++++++++++++ .../streaming/connectors/kudu/KuduSinkTest.java | 109 ----------- 34 files changed, 1467 insertions(+), 1066 deletions(-) diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md index 9b75aa7..8692ca5 100644 --- a/flink-connector-kudu/README.md +++ b/flink-connector-kudu/README.md @@ -29,15 +29,19 @@ env.setParallelism(PARALLELISM); // create a table info object KuduTableInfo tableInfo = KuduTableInfo.Builder .create("books") - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build()) + .addColumn(KuduColumnInfo.Builder.createString("title").build()) + .addColumn(KuduColumnInfo.Builder.createString("author").build()) + .addColumn(KuduColumnInfo.Builder.createDouble("price").build()) + .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build()) .build(); - +// create a reader configuration +KuduReaderConfig readerConfig = KuduReaderConfig.Builder + .setMasters("172.25.0.6") + .setRowLimit(1000) + .build(); // Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips -env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo)) +env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe())) .count(); env.execute(); @@ -54,18 +58,23 @@ env.setParallelism(PARALLELISM); KuduTableInfo tableInfo = KuduTableInfo.Builder .create("books") .createIfNotExist(true) - .replicas(1) - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .replicas(3) + .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build()) + .addColumn(KuduColumnInfo.Builder.createString("title").build()) + .addColumn(KuduColumnInfo.Builder.createString("author").build()) + .addColumn(KuduColumnInfo.Builder.createDouble("price").build()) + .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build()) + .build(); +// create a writer configuration +KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters("172.25.0.6") + .setUpsertWrite() + .setStrongConsistency() .build(); - ... env.fromCollection(books) - .output(new KuduOutputFormat<>("172.25.0.6", tableInfo)); + .output(new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe())); env.execute(); ``` @@ -81,18 +90,23 @@ env.setParallelism(PARALLELISM); KuduTableInfo tableInfo = KuduTableInfo.Builder .create("books") .createIfNotExist(true) - .replicas(1) - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .replicas(3) + .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build()) + .addColumn(KuduColumnInfo.Builder.createString("title").build()) + .addColumn(KuduColumnInfo.Builder.createString("author").build()) + .addColumn(KuduColumnInfo.Builder.createDouble("price").build()) + .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build()) + .build(); +// create a writer configuration +KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters("172.25.0.6") + .setUpsertWrite() + .setStrongConsistency() .build(); - ... env.fromCollection(books) - .addSink(new KuduSink<>("172.25.0.6", tableInfo)); + .addSink(new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe())); env.execute(); ``` diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java new file mode 100644 index 0000000..3a35e6a --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.batch; + +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.connectors.kudu.connector.KuduFilterInfo; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit; +import org.apache.flink.connectors.kudu.connector.reader.KuduReader; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator; +import org.apache.flink.connectors.kudu.connector.serde.KuduDeserialization; +import org.apache.kudu.client.KuduException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class KuduInputFormat<OUT> extends RichInputFormat<OUT, KuduInputSplit> { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final KuduReaderConfig readerConfig; + private final KuduTableInfo tableInfo; + private final KuduDeserialization<OUT> deserializer; + + private List<KuduFilterInfo> tableFilters; + private List<String> tableProjections; + + private boolean endReached; + + private transient KuduReader kuduReader; + private transient KuduReaderIterator resultIterator; + + public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer) { + this(readerConfig, tableInfo, deserializer, new ArrayList<>(), new ArrayList<>()); + } + public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer, List<KuduFilterInfo> tableFilters, List<String> tableProjections) { + + this.readerConfig = checkNotNull(readerConfig,"readerConfig could not be null"); + this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null"); + this.deserializer = checkNotNull(deserializer,"deserializer could not be null"); + this.tableFilters = checkNotNull(tableFilters,"tableFilters could not be null"); + this.tableProjections = checkNotNull(tableProjections,"tableProjections could not be null"); + + this.endReached = false; + } + + @Override + public void configure(Configuration parameters) { + } + + @Override + public void open(KuduInputSplit split) throws IOException { + endReached = false; + startKuduReader(); + + resultIterator = kuduReader.scanner(split.getScanToken()); + } + + private void startKuduReader() throws IOException { + if (kuduReader == null) { + kuduReader = new KuduReader(tableInfo, readerConfig, tableFilters, tableProjections); + } + } + + @Override + public void close() throws IOException { + if (resultIterator != null) { + try { + resultIterator.close(); + } catch (KuduException e) { + e.printStackTrace(); + } + } + if (kuduReader != null) { + kuduReader.close(); + } + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException { + startKuduReader(); + return kuduReader.createInputSplits(minNumSplits); + } + + @Override + public boolean reachedEnd() { + return endReached; + } + + @Override + public OUT nextRecord(OUT reuse) throws IOException { + // check that current iterator has next rows + if (this.resultIterator.hasNext()) { + KuduRow row = resultIterator.next(); + return deserializer.deserialize(row); + } else { + endReached = true; + return null; + } + } + +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java new file mode 100644 index 0000000..9d7d017 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.batch; + +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler; +import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriter; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class KuduOutputFormat<IN> extends RichOutputFormat<IN> implements CheckpointedFunction { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final KuduTableInfo tableInfo; + private final KuduWriterConfig writerConfig; + private final KuduFailureHandler failureHandler; + private final KuduSerialization<IN> serializer; + + private transient KuduWriter kuduWriter; + + public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer) { + this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler()); + } + + public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer, KuduFailureHandler failureHandler) { + this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null"); + this.writerConfig = checkNotNull(writerConfig,"config could not be null"); + this.serializer = checkNotNull(serializer,"serializer could not be null"); + this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null"); + } + + @Override + public void configure(Configuration parameters) { + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler); + + serializer.withSchema(kuduWriter.getSchema()); + } + + @Override + public void writeRecord(IN row) throws IOException { + final KuduRow kuduRow = serializer.serialize(row); + kuduWriter.write(kuduRow); + } + + @Override + public void close() throws IOException { + if (kuduWriter != null) { + kuduWriter.close(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + kuduWriter.flushAndCheckErrors(); + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java similarity index 98% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java index fa7472f..ff8a601 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java @@ -14,13 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.connector; +package org.apache.flink.connectors.kudu.connector; +import org.apache.flink.annotation.PublicEvolving; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Type; import java.io.Serializable; +@PublicEvolving public class KuduColumnInfo implements Serializable { private String name; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java similarity index 97% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index 1a7582d..c37bc9a 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -14,15 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.connector; +package org.apache.flink.connectors.kudu.connector; +import org.apache.flink.annotation.PublicEvolving; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduPredicate; import java.util.List; - +@PublicEvolving public class KuduFilterInfo { private String column; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java similarity index 95% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java index 3c57a1b..af78361 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java @@ -14,8 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.connector; +package org.apache.flink.connectors.kudu.connector; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.types.Row; import java.lang.reflect.Field; @@ -26,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Stream; +@PublicEvolving public class KuduRow extends Row { private Map<String, Integer> rowNames; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java similarity index 97% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java index dfea382..eb63b3f 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java @@ -14,8 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.connector; +package org.apache.flink.connectors.kudu.connector; +import org.apache.flink.annotation.PublicEvolving; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; @@ -24,6 +25,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +@PublicEvolving public class KuduTableInfo implements Serializable { private static final Integer DEFAULT_REPLICAS = 1; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java new file mode 100644 index 0000000..7548033 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.failure; + +import org.apache.kudu.client.RowError; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +public class DefaultKuduFailureHandler implements KuduFailureHandler { + @Override + public void onFailure(List<RowError> failure) throws IOException { + String errors = failure.stream() + .map(error -> error.toString() + System.lineSeparator()) + .collect(Collectors.joining()); + throw new IOException("Error while sending value. \n " + errors); + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java new file mode 100644 index 0000000..42de4f7 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.failure; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.kudu.client.RowError; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +@PublicEvolving +public interface KuduFailureHandler extends Serializable { + + /** + * Handle a failed {@link List<RowError>}. + * + * @param failure the cause of failure + * @throws IOException if the sink should fail on this failure, the implementation should rethrow the throwable or a custom one + */ + void onFailure(List<RowError> failure) throws IOException; + +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java new file mode 100644 index 0000000..a809106 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.reader; + +import org.apache.flink.core.io.LocatableInputSplit; + +public class KuduInputSplit extends LocatableInputSplit { + + private byte[] scanToken; + + /** + * Creates a new KuduInputSplit + * @param splitNumber the number of the input split + * @param hostnames The names of the hosts storing the data this input split refers to. + */ + public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) { + super(splitNumber, hostnames); + + this.scanToken = scanToken; + } + + public byte[] getScanToken() { + return scanToken; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java new file mode 100644 index 0000000..9c6e790 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.reader; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.KuduFilterInfo; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Internal +public class KuduReader implements AutoCloseable { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final KuduTableInfo tableInfo; + private final KuduReaderConfig readerConfig; + private final List<KuduFilterInfo> tableFilters; + private final List<String> tableProjections; + + private transient KuduClient client; + private transient KuduSession session; + private transient KuduTable table; + + public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException { + this(tableInfo, readerConfig, new ArrayList<>(), new ArrayList<>()); + } + public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException { + this(tableInfo, readerConfig, tableFilters, new ArrayList<>()); + } + public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException { + this.tableInfo = tableInfo; + this.readerConfig = readerConfig; + this.tableFilters = tableFilters; + this.tableProjections = tableProjections; + + this.client = obtainClient(); + this.session = obtainSession(); + this.table = obtainTable(); + } + + private KuduClient obtainClient() { + return new KuduClient.KuduClientBuilder(readerConfig.getMasters()).build(); + } + + private KuduSession obtainSession() { + return client.newSession(); + } + + private KuduTable obtainTable() throws IOException { + String tableName = tableInfo.getName(); + if (client.tableExists(tableName)) { + return client.openTable(tableName); + } + if (tableInfo.createIfNotExist()) { + return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); + } + throw new UnsupportedOperationException("table not exists and is marketed to not be created"); + } + + public KuduReaderIterator scanner(byte[] token) throws IOException { + return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client)); + } + + public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) { + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table); + + if (CollectionUtils.isNotEmpty(tableProjections)) { + tokenBuilder.setProjectedColumnNames(tableProjections); + } + + if (CollectionUtils.isNotEmpty(tableFilters)) { + tableFilters.stream() + .map(filter -> filter.toPredicate(table.getSchema())) + .forEach(tokenBuilder::addPredicate); + } + + if (rowLimit !=null && rowLimit > 0) { + tokenBuilder.limit(rowLimit); + } + + return tokenBuilder.build(); + } + + public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException { + + List<KuduScanToken> tokens = scanTokens(tableFilters, tableProjections, readerConfig.getRowLimit()); + + KuduInputSplit[] splits = new KuduInputSplit[tokens.size()]; + + for (int i = 0; i < tokens.size(); i++) { + KuduScanToken token = tokens.get(i); + + List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size()); + + for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) { + locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort())); + } + + KuduInputSplit split = new KuduInputSplit( + token.serialize(), + i, + locations.toArray(new String[locations.size()]) + ); + splits[i] = split; + } + + if (splits.length < minNumSplits) { + log.warn(" The minimum desired number of splits with your configured parallelism level " + + "is {}. Current kudu splits = {}. {} instances will remain idle.", + minNumSplits, + splits.length, + (minNumSplits - splits.length) + ); + } + + return splits; + } + + /** + * Returns a endpoint url in the following format: <host>:<ip> + * + * @param host Hostname + * @param port Port + * @return Formatted URL + */ + private String getLocation(String host, Integer port) { + StringBuilder builder = new StringBuilder(); + builder.append(host).append(":").append(port); + return builder.toString(); + } + + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + log.error("Error while closing session.", e); + } + try { + if (client != null) { + client.close(); + } + } catch (Exception e) { + log.error("Error while closing client.", e); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java new file mode 100644 index 0000000..6f5f079 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.reader; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +@PublicEvolving +public class KuduReaderConfig implements Serializable { + + private final String masters; + private final Integer rowLimit; + + private KuduReaderConfig( + String masters, + Integer rowLimit) { + + this.masters = checkNotNull(masters, "Kudu masters cannot be null"); + this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");; + } + + public String getMasters() { + return masters; + } + + public Integer getRowLimit() { + return rowLimit; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("masters", masters) + .append("rowLimit", rowLimit) + .toString(); + } + + /** + * Builder for the {@link KuduReaderConfig}. + */ + public static class Builder { + private String masters; + private Integer rowLimit = 0; + + private Builder(String masters) { + this.masters = masters; + } + + public static Builder setMasters(String masters) { + return new Builder(masters); + } + + public Builder setRowLimit(Integer rowLimit) { + this.rowLimit = rowLimit; + return this; + } + + public KuduReaderConfig build() { + return new KuduReaderConfig( + masters, + rowLimit); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java new file mode 100644 index 0000000..4a8e69c --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java @@ -0,0 +1,112 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; + +@Internal +public class KuduReaderIterator { + + private KuduScanner scanner; + private RowResultIterator rowIterator; + + public KuduReaderIterator(KuduScanner scanner) throws KuduException { + this.scanner = scanner; + nextRows(); + } + + public void close() throws KuduException { + scanner.close(); + } + + public boolean hasNext() throws KuduException { + if (rowIterator.hasNext()) { + return true; + } else if (scanner.hasMoreRows()) { + nextRows(); + return true; + } else { + return false; + } + } + + public KuduRow next() { + RowResult row = this.rowIterator.next(); + return toKuduRow(row); + } + + private void nextRows() throws KuduException { + this.rowIterator = scanner.nextRows(); + } + + private KuduRow toKuduRow(RowResult row) { + Schema schema = row.getColumnProjection(); + + KuduRow values = new KuduRow(schema.getColumnCount()); + schema.getColumns().forEach(column -> { + String name = column.getName(); + int pos = schema.getColumnIndex(name); + if(row.isNull(name)) { + values.setField(pos, name, null); + } else { + Type type = column.getType(); + switch (type) { + case BINARY: + values.setField(pos, name, row.getBinary(name)); + break; + case STRING: + values.setField(pos, name, row.getString(name)); + break; + case BOOL: + values.setField(pos, name, row.getBoolean(name)); + break; + case DOUBLE: + values.setField(pos, name, row.getDouble(name)); + break; + case FLOAT: + values.setField(pos, name, row.getFloat(name)); + break; + case INT8: + values.setField(pos, name, row.getByte(name)); + break; + case INT16: + values.setField(pos, name, row.getShort(name)); + break; + case INT32: + values.setField(pos, name, row.getInt(name)); + break; + case INT64: + values.setField(pos, name, row.getLong(name)); + break; + case UNIXTIME_MICROS: + values.setField(pos, name, row.getLong(name) / 1000); + break; + default: + throw new IllegalArgumentException("Illegal var type: " + type); + } + } + }); + return values; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java similarity index 90% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java index c12eb42..36584b5 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.serde; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduRow; import org.apache.kudu.Schema; public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java similarity index 88% copy from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java copy to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java index 355a516..190c4c7 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.serde; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduRow; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java similarity index 89% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java index 99db1dc..b13c59b 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.serde; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduRow; import org.apache.kudu.Schema; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java similarity index 95% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java index 1063aa2..bc57174 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.serde; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.kudu.Schema; import java.lang.reflect.Constructor; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java new file mode 100644 index 0000000..f4e2a8a --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler; +import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +@Internal +public class KuduWriter implements AutoCloseable { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final KuduTableInfo tableInfo; + private final KuduWriterConfig writerConfig; + private final KuduFailureHandler failureHandler; + + private transient KuduClient client; + private transient KuduSession session; + private transient KuduTable table; + + + public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig) throws IOException { + this (tableInfo, writerConfig, new DefaultKuduFailureHandler()); + } + public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduFailureHandler failureHandler) throws IOException { + this.tableInfo = tableInfo; + this.writerConfig = writerConfig; + this.failureHandler = failureHandler; + + this.client = obtainClient(); + this.session = obtainSession(); + this.table = obtainTable(); + } + + private KuduClient obtainClient() { + return new KuduClient.KuduClientBuilder(writerConfig.getMasters()).build(); + } + + private KuduSession obtainSession() { + KuduSession session = client.newSession(); + session.setFlushMode(writerConfig.getFlushMode()); + return session; + } + + private KuduTable obtainTable() throws IOException { + String tableName = tableInfo.getName(); + if (client.tableExists(tableName)) { + return client.openTable(tableName); + } + if (tableInfo.createIfNotExist()) { + return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); + } + throw new UnsupportedOperationException("table not exists and is marketed to not be created"); + } + + public Schema getSchema() { + return table.getSchema(); + } + + public void write(KuduRow row) throws IOException { + checkAsyncErrors(); + + final Operation operation = mapToOperation(row); + final OperationResponse response = session.apply(operation); + + checkErrors(response); + } + + public void flushAndCheckErrors() throws IOException { + checkAsyncErrors(); + flush(); + checkAsyncErrors(); + } + + @VisibleForTesting + public DeleteTableResponse deleteTable() throws IOException { + String tableName = table.getName(); + return client.deleteTable(tableName); + } + + @Override + public void close() throws IOException { + try { + flushAndCheckErrors(); + } finally { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + log.error("Error while closing session.", e); + } + try { + if (client != null) { + client.close(); + } + } catch (Exception e) { + log.error("Error while closing client.", e); + } + } + } + + private void flush() throws IOException { + session.flush(); + } + + private void checkErrors(OperationResponse response) throws IOException { + if (response != null && response.hasRowError()) { + failureHandler.onFailure(Arrays.asList(response.getRowError())); + } else { + checkAsyncErrors(); + } + } + + private void checkAsyncErrors() throws IOException { + if (session.countPendingErrors() == 0) return; + + List<RowError> errors = Arrays.asList(session.getPendingErrors().getRowErrors()); + failureHandler.onFailure(errors); + } + + private Operation mapToOperation(KuduRow row) { + final Operation operation = obtainOperation(); + final PartialRow partialRow = operation.getRow(); + + table.getSchema().getColumns().forEach(column -> { + String columnName = column.getName(); + Object value = row.getField(column.getName()); + + if (value == null) { + partialRow.setNull(columnName); + } else { + Type type = column.getType(); + switch (type) { + case STRING: + partialRow.addString(columnName, (String) value); + break; + case FLOAT: + partialRow.addFloat(columnName, (Float) value); + break; + case INT8: + partialRow.addByte(columnName, (Byte) value); + break; + case INT16: + partialRow.addShort(columnName, (Short) value); + break; + case INT32: + partialRow.addInt(columnName, (Integer) value); + break; + case INT64: + partialRow.addLong(columnName, (Long) value); + break; + case DOUBLE: + partialRow.addDouble(columnName, (Double) value); + break; + case BOOL: + partialRow.addBoolean(columnName, (Boolean) value); + break; + case UNIXTIME_MICROS: + //*1000 to correctly create date on kudu + partialRow.addLong(columnName, ((Long) value) * 1000); + break; + case BINARY: + partialRow.addBinary(columnName, (byte[]) value); + break; + default: + throw new IllegalArgumentException("Illegal var type: " + type); + } + } + }); + return operation; + } + + private Operation obtainOperation() { + switch (writerConfig.getWriteMode()) { + case INSERT: return table.newInsert(); + case UPDATE: return table.newUpdate(); + case UPSERT: return table.newUpsert(); + } + return table.newUpsert(); + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java new file mode 100644 index 0000000..13672d5 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.writer; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.kudu.client.SessionConfiguration.FlushMode; + +@PublicEvolving +public class KuduWriterConfig implements Serializable { + + private final String masters; + private final FlushMode flushMode; + private final KuduWriterMode writeMode; + + private KuduWriterConfig( + String masters, + FlushMode flushMode, + KuduWriterMode writeMode) { + + this.masters = checkNotNull(masters, "Kudu masters cannot be null"); + this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null"); + this.writeMode = checkNotNull(writeMode, "Kudu write mode cannot be null"); + } + + public String getMasters() { + return masters; + } + + public KuduWriterMode getWriteMode() { + return writeMode; + } + + public FlushMode getFlushMode() { + return flushMode; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("masters", masters) + .append("flushMode", flushMode) + .append("writeMode", writeMode) + .toString(); + } + + /** + * Builder for the {@link KuduWriterConfig}. + */ + public static class Builder { + private String masters; + private KuduWriterMode writeMode = KuduWriterMode.UPSERT; + private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; + + private Builder(String masters) { + this.masters = masters; + } + + public static Builder setMasters(String masters) { + return new Builder(masters); + } + + public Builder setWriteMode(KuduWriterMode writeMode) { + this.writeMode = writeMode; + return this; + } + public Builder setUpsertWrite() { + return setWriteMode(KuduWriterMode.UPSERT); + } + public Builder setInsertWrite() { + return setWriteMode(KuduWriterMode.INSERT); + } + public Builder setUpdateWrite() { + return setWriteMode(KuduWriterMode.UPDATE); + } + + public Builder setConsistency(FlushMode flushMode) { + this.flushMode = flushMode; + return this; + } + public Builder setEventualConsistency() { + return setConsistency(FlushMode.AUTO_FLUSH_BACKGROUND); + } + public Builder setStrongConsistency() { + return setConsistency(FlushMode.AUTO_FLUSH_SYNC); + } + + public KuduWriterConfig build() { + return new KuduWriterConfig( + masters, + flushMode, + writeMode); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java new file mode 100644 index 0000000..27b2ed3 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.writer; + +import static org.apache.kudu.client.SessionConfiguration.*; + +public enum KuduWriterConsistency { + EVENTUAL(FlushMode.AUTO_FLUSH_BACKGROUND), + STRONG(FlushMode.AUTO_FLUSH_SYNC), + //CHECKPOINT(FlushMode.MANUAL_FLUSH) + ; + + public final FlushMode flushMode; + + KuduWriterConsistency(FlushMode flushMode) { + this.flushMode = flushMode; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java similarity index 54% rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java index 355a516..8c9eab0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java @@ -1,25 +1,23 @@ /* - * Licensed serialize the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. - * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed serialize in writing, software + * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.writer; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; - -import java.io.Serializable; - -public interface KuduDeserialization<T> extends Serializable { - T deserialize(KuduRow row); +public enum KuduWriterMode { + INSERT, + UPDATE, + UPSERT } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java new file mode 100644 index 0000000..d523b67 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.streaming; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +@PublicEvolving +public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final KuduTableInfo tableInfo; + private final KuduWriterConfig writerConfig; + private final KuduFailureHandler failureHandler; + private final KuduSerialization<OUT> serializer; + + private transient KuduWriter kuduWriter; + + public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) { + this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler()); + } + + public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer, KuduFailureHandler failureHandler) { + this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null"); + this.writerConfig = checkNotNull(writerConfig,"config could not be null"); + this.serializer = checkNotNull(serializer,"serializer could not be null"); + this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null"); + } + + @Override + public void open(Configuration parameters) throws Exception { + kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler); + + serializer.withSchema(kuduWriter.getSchema()); + } + + @Override + public void invoke(OUT value) throws Exception { + final KuduRow kuduRow = serializer.serialize(value); + kuduWriter.write(kuduRow); + } + + @Override + public void close() throws Exception { + if (kuduWriter != null) { + kuduWriter.close(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + kuduWriter.flushAndCheckErrors(); + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + } + +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java deleted file mode 100644 index fd126d0..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu; - -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.core.io.LocatableInputSplit; -import org.apache.flink.streaming.connectors.kudu.connector.*; -import org.apache.flink.util.Preconditions; -import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduScanToken; -import org.apache.kudu.client.LocatedTablet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> { - - private String kuduMasters; - private KuduTableInfo tableInfo; - private List<KuduFilterInfo> tableFilters; - private List<String> tableProjections; - private Long rowsLimit; - private boolean endReached; - - private transient KuduConnector tableContext; - private transient KuduRowIterator resultIterator; - - private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class); - - public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) { - Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); - this.kuduMasters = kuduMasters; - - Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); - this.tableInfo = tableInfo; - - this.endReached = false; - } - - public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) { - return withTableFilters(Arrays.asList(tableFilters)); - } - - public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) { - this.tableFilters = tableFilters; - return this; - } - - public KuduInputFormat withTableProjections(String... tableProjections) { - return withTableProjections(Arrays.asList(tableProjections)); - } - public KuduInputFormat withTableProjections(List<String> tableProjections) { - this.tableProjections = tableProjections; - return this; - } - - public KuduInputFormat withRowsLimit(Long rowsLimit) { - this.rowsLimit = rowsLimit; - return this; - } - - @Override - public void configure(Configuration parameters) { - - } - - @Override - public void open(KuduInputSplit split) throws IOException { - endReached = false; - startTableContext(); - - resultIterator = tableContext.scanner(split.getScanToken()); - } - - @Override - public void close() { - if (resultIterator != null) { - try { - resultIterator.close(); - } catch (KuduException e) { - e.printStackTrace(); - } - } - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - private void startTableContext() throws IOException { - if (tableContext == null) { - tableContext = new KuduConnector(kuduMasters, tableInfo); - } - } - - @Override - public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException { - startTableContext(); - Preconditions.checkNotNull(tableContext,"tableContext should not be null"); - - List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit); - - KuduInputSplit[] splits = new KuduInputSplit[tokens.size()]; - - for (int i = 0; i < tokens.size(); i++) { - KuduScanToken token = tokens.get(i); - - List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size()); - - for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) { - locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort())); - } - - KuduInputSplit split = new KuduInputSplit( - token.serialize(), - i, - locations.toArray(new String[locations.size()]) - ); - splits[i] = split; - } - - if (splits.length < minNumSplits) { - LOG.warn(" The minimum desired number of splits with your configured parallelism level " + - "is {}. Current kudu splits = {}. {} instances will remain idle.", - minNumSplits, - splits.length, - (minNumSplits - splits.length) - ); - } - - return splits; - } - - @Override - public boolean reachedEnd() throws IOException { - return endReached; - } - - @Override - public KuduRow nextRecord(KuduRow reuse) throws IOException { - // check that current iterator has next rows - if (this.resultIterator.hasNext()) { - return resultIterator.next(); - } else { - endReached = true; - return null; - } - } - - /** - * Returns a endpoint url in the following format: <host>:<ip> - * - * @param host Hostname - * @param port Port - * @return Formatted URL - */ - private String getLocation(String host, Integer port) { - StringBuilder builder = new StringBuilder(); - builder.append(host).append(":").append(port); - return builder.toString(); - } - - - public class KuduInputSplit extends LocatableInputSplit { - - private byte[] scanToken; - - /** - * Creates a new KuduInputSplit - * @param splitNumber the number of the input split - * @param hostnames The names of the hosts storing the data this input split refers to. - */ - public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) { - super(splitNumber, hostnames); - - this.scanToken = scanToken; - } - - public byte[] getScanToken() { - return scanToken; - } - } -} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java deleted file mode 100644 index c1301da..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu; - -import org.apache.flink.api.common.io.RichOutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; -import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; -import org.apache.flink.util.Preconditions; -import org.apache.kudu.client.SessionConfiguration.FlushMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); - - private String kuduMasters; - private KuduTableInfo tableInfo; - private KuduConnector.Consistency consistency; - private KuduConnector.WriteMode writeMode; - - private KuduSerialization<OUT> serializer; - - private transient KuduConnector connector; - - - public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) { - Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); - this.kuduMasters = kuduMasters; - - Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); - this.tableInfo = tableInfo; - this.consistency = KuduConnector.Consistency.STRONG; - this.writeMode = KuduConnector.WriteMode.UPSERT; - this.serializer = serializer.withSchema(tableInfo.getSchema()); - } - - - public KuduOutputFormat<OUT> withEventualConsistency() { - this.consistency = KuduConnector.Consistency.EVENTUAL; - return this; - } - - public KuduOutputFormat<OUT> withStrongConsistency() { - this.consistency = KuduConnector.Consistency.STRONG; - return this; - } - - public KuduOutputFormat<OUT> withUpsertWriteMode() { - this.writeMode = KuduConnector.WriteMode.UPSERT; - return this; - } - - public KuduOutputFormat<OUT> withInsertWriteMode() { - this.writeMode = KuduConnector.WriteMode.INSERT; - return this; - } - - public KuduOutputFormat<OUT> withUpdateWriteMode() { - this.writeMode = KuduConnector.WriteMode.UPDATE; - return this; - } - - @Override - public void configure(Configuration parameters) { - - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - if (connector != null) return; - connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC); - serializer = serializer.withSchema(tableInfo.getSchema()); - } - - @Override - public void writeRecord(OUT row) throws IOException { - boolean response; - try { - KuduRow kuduRow = serializer.serialize(row); - response = connector.writeRow(kuduRow); - } catch (Exception e) { - throw new IOException(e.getLocalizedMessage(), e); - } - - if(!response) { - throw new IOException("error with some transaction"); - } - } - - @Override - public void close() throws IOException { - if (this.connector == null) return; - try { - this.connector.close(); - } catch (Exception e) { - throw new IOException(e.getLocalizedMessage(), e); - } - } -} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java deleted file mode 100644 index b6dd9c8..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; -import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; -import org.apache.flink.util.Preconditions; -import org.apache.kudu.client.SessionConfiguration.FlushMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); - - private String kuduMasters; - private KuduTableInfo tableInfo; - private KuduConnector.Consistency consistency; - private KuduConnector.WriteMode writeMode; - private FlushMode flushMode; - - private KuduSerialization<OUT> serializer; - - private transient KuduConnector connector; - - public KuduSink(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) { - Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); - this.kuduMasters = kuduMasters; - - Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); - this.tableInfo = tableInfo; - this.consistency = KuduConnector.Consistency.STRONG; - this.writeMode = KuduConnector.WriteMode.UPSERT; - this.serializer = serializer.withSchema(tableInfo.getSchema()); - } - - public KuduSink<OUT> withEventualConsistency() { - this.consistency = KuduConnector.Consistency.EVENTUAL; - return this; - } - - public KuduSink<OUT> withStrongConsistency() { - this.consistency = KuduConnector.Consistency.STRONG; - return this; - } - - public KuduSink<OUT> withUpsertWriteMode() { - this.writeMode = KuduConnector.WriteMode.UPSERT; - return this; - } - - public KuduSink<OUT> withInsertWriteMode() { - this.writeMode = KuduConnector.WriteMode.INSERT; - return this; - } - - public KuduSink<OUT> withUpdateWriteMode() { - this.writeMode = KuduConnector.WriteMode.UPDATE; - return this; - } - - public KuduSink<OUT> withSyncFlushMode() { - this.flushMode = FlushMode.AUTO_FLUSH_SYNC; - return this; - } - - public KuduSink<OUT> withAsyncFlushMode() { - this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; - return this; - } - - @Override - public void open(Configuration parameters) throws IOException { - if (this.connector != null) return; - this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode()); - this.serializer.withSchema(tableInfo.getSchema()); - } - - /** - * If flink checkpoint is disable,synchronously write data to kudu. - * <p>If flink checkpoint is enable, asynchronously write data to kudu by default. - * - * <p>(Note: async may result in out-of-order writes to Kudu. - * you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. ) - * - * @return flushMode - */ - private FlushMode getflushMode() { - FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC; - boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); - if(enableCheckpoint && this.flushMode == null) { - flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; - } - if(enableCheckpoint && this.flushMode != null) { - flushMode = this.flushMode; - } - return flushMode; - } - - @Override - public void invoke(OUT row) throws Exception { - KuduRow kuduRow = serializer.serialize(row); - boolean response = connector.writeRow(kuduRow); - - if(!response) { - throw new IOException("error with some transaction"); - } - } - - @Override - public void close() throws Exception { - if (this.connector == null) return; - try { - this.connector.close(); - } catch (Exception e) { - throw new IOException(e.getLocalizedMessage(), e); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state {} ...", context.getCheckpointId()); - } - this.connector.flush(); - } -} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java deleted file mode 100644 index d45886c..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu.connector; - -import com.stumbleupon.async.Callback; -import com.stumbleupon.async.Deferred; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.flink.api.common.time.Time; -import org.apache.kudu.client.*; -import org.apache.kudu.client.SessionConfiguration.FlushMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -public class KuduConnector implements AutoCloseable { - - private final Logger LOG = LoggerFactory.getLogger(this.getClass()); - - private Callback<Boolean, OperationResponse> defaultCB; - - public enum Consistency {EVENTUAL, STRONG}; - public enum WriteMode {INSERT,UPDATE,UPSERT} - - private AsyncKuduClient client; - private KuduTable table; - private AsyncKuduSession session; - - private Consistency consistency; - private WriteMode writeMode; - - private static AtomicInteger pendingTransactions = new AtomicInteger(); - private static AtomicBoolean errorTransactions = new AtomicBoolean(false); - - public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException { - this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC); - } - - public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException { - this.client = client(kuduMasters); - this.table = table(tableInfo); - this.session = client.newSession(); - this.consistency = consistency; - this.writeMode = writeMode; - this.defaultCB = new ResponseCallback(); - this.session.setFlushMode(flushMode); - } - - private AsyncKuduClient client(String kuduMasters) { - return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build(); - } - - private KuduTable table(KuduTableInfo infoTable) throws IOException { - KuduClient syncClient = client.syncClient(); - - String tableName = infoTable.getName(); - if (syncClient.tableExists(tableName)) { - return syncClient.openTable(tableName); - } - if (infoTable.createIfNotExist()) { - return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions()); - } - throw new UnsupportedOperationException("table not exists and is marketed to not be created"); - } - - public boolean deleteTable() throws IOException { - String tableName = table.getName(); - client.syncClient().deleteTable(tableName); - return true; - } - - public KuduRowIterator scanner(byte[] token) throws IOException { - return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, client.syncClient())); - } - - public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) { - KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table); - - if (CollectionUtils.isNotEmpty(tableProjections)) { - tokenBuilder.setProjectedColumnNames(tableProjections); - } - - if (CollectionUtils.isNotEmpty(tableFilters)) { - tableFilters.stream() - .map(filter -> filter.toPredicate(table.getSchema())) - .forEach(tokenBuilder::addPredicate); - } - - if (rowLimit !=null && rowLimit > 0) { - tokenBuilder.limit(rowLimit); - } - - return tokenBuilder.build(); - } - - public boolean writeRow(KuduRow row) throws Exception { - final Operation operation = KuduMapper.toOperation(table, writeMode, row); - - Deferred<OperationResponse> response = session.apply(operation); - - if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) { - pendingTransactions.incrementAndGet(); - response.addCallback(defaultCB); - } else { - processResponse(response.join()); - } - - return !errorTransactions.get(); - - } - - @Override - public void close() throws Exception { - while(pendingTransactions.get() > 0) { - LOG.info("sleeping {}s by pending transactions", pendingTransactions.get()); - Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds()); - } - - if (session == null) return; - session.close(); - - if (client == null) return; - client.close(); - } - - public void flush(){ - this.session.flush(); - } - - private class ResponseCallback implements Callback<Boolean, OperationResponse> { - @Override - public Boolean call(OperationResponse operationResponse) { - pendingTransactions.decrementAndGet(); - processResponse(operationResponse); - return errorTransactions.get(); - } - } - - protected void processResponse(OperationResponse operationResponse) { - if (operationResponse == null) return; - - if (operationResponse.hasRowError()) { - logResponseError(operationResponse.getRowError()); - errorTransactions.set(true); - } - } - - private void logResponseError(RowError error) { - LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString()); - } - -} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java deleted file mode 100644 index 86b683f..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu.connector; - - -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.PartialRow; -import org.apache.kudu.client.RowResult; - -final class KuduMapper { - - private KuduMapper() { } - - static KuduRow toKuduRow(RowResult row) { - Schema schema = row.getColumnProjection(); - - KuduRow values = new KuduRow(schema.getColumnCount()); - schema.getColumns().forEach(column -> { - String name = column.getName(); - int pos = schema.getColumnIndex(name); - if(row.isNull(name)) { - values.setField(pos, name, null); - } else { - Type type = column.getType(); - switch (type) { - case BINARY: - values.setField(pos, name, row.getBinary(name)); - break; - case STRING: - values.setField(pos, name, row.getString(name)); - break; - case BOOL: - values.setField(pos, name, row.getBoolean(name)); - break; - case DOUBLE: - values.setField(pos, name, row.getDouble(name)); - break; - case FLOAT: - values.setField(pos, name, row.getFloat(name)); - break; - case INT8: - values.setField(pos, name, row.getByte(name)); - break; - case INT16: - values.setField(pos, name, row.getShort(name)); - break; - case INT32: - values.setField(pos, name, row.getInt(name)); - break; - case INT64: - values.setField(pos, name, row.getLong(name)); - break; - case UNIXTIME_MICROS: - values.setField(pos, name, row.getLong(name) / 1000); - break; - default: - throw new IllegalArgumentException("Illegal var type: " + type); - } - } - }); - return values; - } - - - static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) { - final Operation operation = toOperation(table, writeMode); - final PartialRow partialRow = operation.getRow(); - - table.getSchema().getColumns().forEach(column -> { - String columnName = column.getName(); - Object value = row.getField(column.getName()); - - if (value == null) { - partialRow.setNull(columnName); - } else { - Type type = column.getType(); - switch (type) { - case STRING: - partialRow.addString(columnName, (String) value); - break; - case FLOAT: - partialRow.addFloat(columnName, (Float) value); - break; - case INT8: - partialRow.addByte(columnName, (Byte) value); - break; - case INT16: - partialRow.addShort(columnName, (Short) value); - break; - case INT32: - partialRow.addInt(columnName, (Integer) value); - break; - case INT64: - partialRow.addLong(columnName, (Long) value); - break; - case DOUBLE: - partialRow.addDouble(columnName, (Double) value); - break; - case BOOL: - partialRow.addBoolean(columnName, (Boolean) value); - break; - case UNIXTIME_MICROS: - //*1000 to correctly create date on kudu - partialRow.addLong(columnName, ((Long) value) * 1000); - break; - case BINARY: - partialRow.addBinary(columnName, (byte[]) value); - break; - default: - throw new IllegalArgumentException("Illegal var type: " + type); - } - } - }); - return operation; - } - - static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) { - switch (writeMode) { - case INSERT: return table.newInsert(); - case UPDATE: return table.newUpdate(); - case UPSERT: return table.newUpsert(); - } - return table.newUpsert(); - } - -} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java deleted file mode 100644 index 46cbff1..0000000 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed serialize the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file serialize You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed serialize in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu.connector; - -import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduScanner; -import org.apache.kudu.client.RowResult; -import org.apache.kudu.client.RowResultIterator; - -public class KuduRowIterator { - - private KuduScanner scanner; - private RowResultIterator rowIterator; - - public KuduRowIterator(KuduScanner scanner) throws KuduException { - this.scanner = scanner; - nextRows(); - } - - public void close() throws KuduException { - scanner.close(); - } - - public boolean hasNext() throws KuduException { - if (rowIterator.hasNext()) { - return true; - } else if (scanner.hasMoreRows()) { - nextRows(); - return true; - } else { - return false; - } - } - - public KuduRow next() { - RowResult row = this.rowIterator.next(); - return KuduMapper.toKuduRow(row); - } - - private void nextRows() throws KuduException { - this.rowIterator = scanner.nextRows(); - } -} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java similarity index 61% rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java index 041b77e..e22f40e 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java @@ -14,35 +14,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu; - -import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +package org.apache.flink.connectors.kudu.batch; + +import org.apache.flink.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; - -public class KuduInputFormatTest extends KuduDatabase { +class KuduInputFormatTest extends KuduDatabase { @Test - public void testInvalidKuduMaster() throws IOException { + void testInvalidKuduMaster() { KuduTableInfo tableInfo = booksTableInfo("books",false); - Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo)); + Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(null, tableInfo, new DefaultSerDe())); } @Test - public void testInvalidTableInfo() throws IOException { + void testInvalidTableInfo() { String masterAddresses = harness.getMasterAddressesAsString(); - Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(masterAddresses, null)); + KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(readerConfig, null, new DefaultSerDe())); } @Test - public void testInputFormat() throws Exception { + void testInputFormat() throws Exception { KuduTableInfo tableInfo = booksTableInfo("books",true); setUpDatabase(tableInfo); @@ -53,7 +56,7 @@ public class KuduInputFormatTest extends KuduDatabase { } @Test - public void testInputFormatWithProjection() throws Exception { + void testInputFormatWithProjection() throws Exception { KuduTableInfo tableInfo = booksTableInfo("books",true); setUpDatabase(tableInfo); @@ -68,14 +71,14 @@ public class KuduInputFormatTest extends KuduDatabase { } - public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { + private List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { String masterAddresses = harness.getMasterAddressesAsString(); - KuduInputFormat inputFormat = new KuduInputFormat(masterAddresses, tableInfo) - .withTableProjections(fieldProjection); + KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); + KuduInputFormat<KuduRow> inputFormat = new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe(), new ArrayList<>(), Arrays.asList(fieldProjection)); - KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1); + KuduInputSplit[] splits = inputFormat.createInputSplits(1); List<KuduRow> rows = new ArrayList<>(); - for (KuduInputFormat.KuduInputSplit split : splits) { + for (KuduInputSplit split : splits) { inputFormat.open(split); while(!inputFormat.reachedEnd()) { KuduRow row = inputFormat.nextRecord(new KuduRow(5)); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java similarity index 63% rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java index 4e91310..963a8c0 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java @@ -14,48 +14,54 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu; +package org.apache.flink.connectors.kudu.batch; -import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; -import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe; +import org.apache.flink.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.List; import java.util.UUID; -public class KuduOuputFormatTest extends KuduDatabase { +class KuduOuputFormatTest extends KuduDatabase { @Test - public void testInvalidKuduMaster() throws IOException { + void testInvalidKuduMaster() { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe())); } @Test - public void testInvalidTableInfo() throws IOException { + void testInvalidTableInfo() { String masterAddresses = harness.getMasterAddressesAsString(); - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe())); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, new DefaultSerDe())); } @Test - public void testNotTableExist() throws IOException { + void testNotTableExist() { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()); Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1)); } @Test - public void testOutputWithStrongConsistency() throws Exception { + void testOutputWithStrongConsistency() throws Exception { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) - .withStrongConsistency(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setStrongConsistency() + .build(); + KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()); + outputFormat.open(0,1); for (KuduRow kuduRow : booksDataRow()) { @@ -63,19 +69,23 @@ public class KuduOuputFormatTest extends KuduDatabase { } outputFormat.close(); - List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); cleanDatabase(tableInfo); } @Test - public void testOutputWithEventualConsistency() throws Exception { + void testOutputWithEventualConsistency() throws Exception { String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) - .withEventualConsistency(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setEventualConsistency() + .build(); + KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()); + outputFormat.open(0,1); for (KuduRow kuduRow : booksDataRow()) { @@ -87,7 +97,7 @@ public class KuduOuputFormatTest extends KuduDatabase { outputFormat.close(); - List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); cleanDatabase(tableInfo); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java similarity index 64% rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java index d22203d..3d02a1d 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java @@ -14,8 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.connector; +package org.apache.flink.connectors.kudu.connector; +import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connectors.kudu.connector.reader.KuduReader; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriter; import org.apache.kudu.Type; import org.apache.kudu.test.KuduTestHarness; import org.junit.Rule; @@ -23,6 +29,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -33,7 +40,7 @@ public class KuduDatabase { @Rule public static KuduTestHarness harness = new KuduTestHarness(); - protected static final Object[][] booksTableData = { + private static final Object[][] booksTableData = { {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11}, {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22}, {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33}, @@ -68,17 +75,19 @@ public class KuduDatabase { .collect(Collectors.toList()); } - public void setUpDatabase(KuduTableInfo tableInfo) { + protected void setUpDatabase(KuduTableInfo tableInfo) { try { String masterAddresses = harness.getMasterAddressesAsString(); - KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig); booksDataRow().forEach(row -> { try { - tableContext.writeRow(row); + kuduWriter.write(row); }catch (Exception e) { e.printStackTrace(); } }); + kuduWriter.close(); } catch (Exception e) { Assertions.fail(); } @@ -87,11 +96,34 @@ public class KuduDatabase { protected void cleanDatabase(KuduTableInfo tableInfo) { try { String masterAddresses = harness.getMasterAddressesAsString(); - KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo); - tableContext.deleteTable(); - tableContext.close(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig); + kuduWriter.deleteTable(); + kuduWriter.close(); } catch (Exception e) { Assertions.fail(); } } + + protected List<KuduRow> readRows(KuduTableInfo tableInfo) throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); + KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); + KuduReader reader = new KuduReader(tableInfo, readerConfig); + + KuduInputSplit[] splits = reader.createInputSplits(1); + List<KuduRow> rows = new ArrayList<>(); + for (KuduInputSplit split : splits) { + KuduReaderIterator resultIterator = reader.scanner(split.getScanToken()); + while(resultIterator.hasNext()) { + KuduRow row = resultIterator.next(); + if(row != null) { + rows.add(row); + } + } + } + reader.close(); + + return rows; + } + } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java similarity index 87% rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java index afe57ca..6057113 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kudu.serde; +package org.apache.flink.connectors.kudu.connector.serde; -import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.KuduColumnInfo; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.kudu.Type; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java new file mode 100644 index 0000000..ea49a91 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.streaming; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.connectors.kudu.connector.KuduColumnInfo; +import org.apache.flink.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.connectors.kudu.connector.KuduRow; +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe; +import org.apache.kudu.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.List; +import java.util.UUID; + +class KuduSinkTest extends KuduDatabase { + + private static StreamingRuntimeContext context; + + @BeforeAll + static void start() { + context = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(context.isCheckpointingEnabled()).thenReturn(true); + } + + @Test + void testInvalidKuduMaster() { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new DefaultSerDe())); + } + + @Test + void testInvalidTableInfo() { + String masterAddresses = harness.getMasterAddressesAsString(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new DefaultSerDe())); + } + + @Test + void testNotTableExist() { + String masterAddresses = harness.getMasterAddressesAsString(); + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); + KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()); + + sink.setRuntimeContext(context); + Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); + } + + @Test + void testOutputWithStrongConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setStrongConsistency() + .build(); + KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()); + + sink.setRuntimeContext(context); + sink.open(new Configuration()); + + for (KuduRow kuduRow : booksDataRow()) { + sink.invoke(kuduRow); + } + sink.close(); + + List<KuduRow> rows = readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + + } + + @Test + void testOutputWithEventualConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setEventualConsistency() + .build(); + KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()); + + sink.setRuntimeContext(context); + sink.open(new Configuration()); + + for (KuduRow kuduRow : booksDataRow()) { + sink.invoke(kuduRow); + } + + // sleep to allow eventual consistency to finish + Thread.sleep(1000); + + sink.close(); + + List<KuduRow> rows = readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + } + + + @Test + void testSpeed() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); + + KuduTableInfo tableInfo = KuduTableInfo.Builder + .create("test_speed") + .createIfNotExist(true) + .replicas(3) + .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) + .addColumn(KuduColumnInfo.Builder.create("uuid", Type.STRING).build()) + .build(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setEventualConsistency() + .build(); + KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()); + + sink.setRuntimeContext(context); + sink.open(new Configuration()); + + int totalRecords = 100000; + for (int i=0; i < totalRecords; i++) { + KuduRow kuduRow = new KuduRow(2); + kuduRow.setField(0, "id", i); + kuduRow.setField(1, "uuid", UUID.randomUUID().toString()); + sink.invoke(kuduRow); + } + + // sleep to allow eventual consistency to finish + Thread.sleep(1000); + + sink.close(); + + List<KuduRow> rows = readRows(tableInfo); + Assertions.assertEquals(totalRecords, rows.size()); + } + +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java deleted file mode 100644 index 225bf7c..0000000 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; -import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; -import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; -import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; - - -public class KuduSinkTest extends KuduDatabase { - - private static StreamingRuntimeContext context; - - @BeforeAll - public static void start() { - context = Mockito.mock(StreamingRuntimeContext.class); - Mockito.when(context.isCheckpointingEnabled()).thenReturn(true); - } - - @Test - public void testInvalidKuduMaster() throws IOException { - KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe())); - } - - @Test - public void testInvalidTableInfo() throws IOException { - String masterAddresses = harness.getMasterAddressesAsString(); - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe())); - } - - @Test - public void testNotTableExist() throws IOException { - String masterAddresses = harness.getMasterAddressesAsString(); - KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()); - sink.setRuntimeContext(context); - Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); - } - - @Test - public void testOutputWithStrongConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); - - KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) - .withStrongConsistency(); - sink.setRuntimeContext(context); - sink.open(new Configuration()); - - for (KuduRow kuduRow : booksDataRow()) { - sink.invoke(kuduRow); - } - sink.close(); - - List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); - Assertions.assertEquals(5, rows.size()); - - } - - @Test - public void testOutputWithEventualConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); - - KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) - .withEventualConsistency(); - sink.setRuntimeContext(context); - sink.open(new Configuration()); - - for (KuduRow kuduRow : booksDataRow()) { - sink.invoke(kuduRow); - } - - // sleep to allow eventual consistency to finish - Thread.sleep(1000); - - sink.close(); - - List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); - Assertions.assertEquals(5, rows.size()); - } - -}