This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 247dfe99486f04622b4ee7d83fa51ef253aaa75d Author: eskabetxe <b...@boto.pro> AuthorDate: Tue Jan 15 13:05:29 2019 +0100 [BAHIR-180] Improve eventual consistence for Kudu connector Closes #40 --- flink-connector-kudu/pom.xml | 2 +- .../streaming/connectors/kudu/KuduInputFormat.java | 27 ++--- .../connectors/kudu/KuduOutputFormat.java | 36 +++--- .../flink/streaming/connectors/kudu/KuduSink.java | 34 +++--- .../connectors/kudu/connector/KuduColumnInfo.java | 50 ++++++++ .../connectors/kudu/connector/KuduConnector.java | 81 ++++++++----- .../connectors/kudu/connector/KuduMapper.java | 59 +++++---- .../connectors/kudu/connector/KuduRow.java | 72 ++--------- .../connectors/kudu/connector/KuduRowIterator.java | 57 +++++++++ .../connectors/kudu/serde/DefaultSerDe.java | 39 ++++++ .../connectors/kudu/serde/KuduDeserialization.java | 25 ++++ .../connectors/kudu/serde/KuduSerialization.java | 28 +++++ .../streaming/connectors/kudu/serde/PojoSerDe.java | 134 +++++++++++++++++++++ .../connectors/kudu/KuduOuputFormatTest.java | 11 +- .../streaming/connectors/kudu/KuduSinkTest.java | 11 +- .../connectors/kudu/connector/KuduDatabase.java | 2 +- .../connectors/kudu/serde/PojoSerDeTest.java | 57 +++++++++ 17 files changed, 543 insertions(+), 182 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 5540fc1..61ab4a6 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ <packaging>jar</packaging> <properties> - <kudu.version>1.8.0</kudu.version> + <kudu.version>1.7.1</kudu.version> <junit.groups>!DockerTest</junit.groups> </properties> diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java index 617e317..fd126d0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java @@ -24,7 +24,9 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.streaming.connectors.kudu.connector.*; import org.apache.flink.util.Preconditions; -import org.apache.kudu.client.*; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.LocatedTablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +45,7 @@ public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.Ku private boolean endReached; private transient KuduConnector tableContext; - private transient KuduScanner scanner; - private transient RowResultIterator resultIterator; + private transient KuduRowIterator resultIterator; private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class); @@ -90,15 +91,14 @@ public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.Ku endReached = false; startTableContext(); - scanner = tableContext.scanner(split.getScanToken()); - resultIterator = scanner.nextRows(); + resultIterator = tableContext.scanner(split.getScanToken()); } @Override public void close() { - if (scanner != null) { + if (resultIterator != null) { try { - scanner.close(); + resultIterator.close(); } catch (KuduException e) { e.printStackTrace(); } @@ -168,18 +168,11 @@ public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.Ku public KuduRow nextRecord(KuduRow reuse) throws IOException { // check that current iterator has next rows if (this.resultIterator.hasNext()) { - RowResult row = this.resultIterator.next(); - return KuduMapper.toKuduRow(row); - } - // if not, check that current scanner has more iterators - else if (scanner.hasMoreRows()) { - this.resultIterator = scanner.nextRows(); - return nextRecord(reuse); - } - else { + return resultIterator.next(); + } else { endReached = true; + return null; } - return null; } /** diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java index 5c23f36..9d12710 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java @@ -16,18 +16,19 @@ */ package org.apache.flink.streaming.connectors.kudu; -import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> { +public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> { private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); @@ -36,10 +37,12 @@ public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> private KuduConnector.Consistency consistency; private KuduConnector.WriteMode writeMode; - private transient KuduConnector tableContext; + private KuduSerialization<OUT> serializer; + private transient KuduConnector connector; - public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) { + + public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) { Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); this.kuduMasters = kuduMasters; @@ -47,8 +50,10 @@ public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> this.tableInfo = tableInfo; this.consistency = KuduConnector.Consistency.STRONG; this.writeMode = KuduConnector.WriteMode.UPSERT; + this.serializer = serializer.withSchema(tableInfo.getSchema()); } + public KuduOutputFormat<OUT> withEventualConsistency() { this.consistency = KuduConnector.Consistency.EVENTUAL; return this; @@ -81,28 +86,31 @@ public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> @Override public void open(int taskNumber, int numTasks) throws IOException { - startTableContext(); - } - - private void startTableContext() throws IOException { - if (tableContext != null) return; - tableContext = new KuduConnector(kuduMasters, tableInfo); + if (connector != null) return; + connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); + serializer = serializer.withSchema(tableInfo.getSchema()); } @Override - public void writeRecord(OUT kuduRow) throws IOException { + public void writeRecord(OUT row) throws IOException { + boolean response; try { - tableContext.writeRow(kuduRow, consistency, writeMode); + KuduRow kuduRow = serializer.serialize(row); + response = connector.writeRow(kuduRow); } catch (Exception e) { throw new IOException(e.getLocalizedMessage(), e); } + + if(!response) { + throw new IOException("error with some transaction"); + } } @Override public void close() throws IOException { - if (this.tableContext == null) return; + if (this.connector == null) return; try { - this.tableContext.close(); + this.connector.close(); } catch (Exception e) { throw new IOException(e.getLocalizedMessage(), e); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java index 120d5c5..53cf249 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java @@ -21,13 +21,14 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> { +public class KuduSink<OUT> extends RichSinkFunction<OUT> { private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); @@ -36,10 +37,11 @@ public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> { private KuduConnector.Consistency consistency; private KuduConnector.WriteMode writeMode; - private transient KuduConnector tableContext; + private KuduSerialization<OUT> serializer; + private transient KuduConnector connector; - public KuduSink(String kuduMasters, KuduTableInfo tableInfo) { + public KuduSink(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) { Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); this.kuduMasters = kuduMasters; @@ -47,6 +49,7 @@ public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> { this.tableInfo = tableInfo; this.consistency = KuduConnector.Consistency.STRONG; this.writeMode = KuduConnector.WriteMode.UPSERT; + this.serializer = serializer.withSchema(tableInfo.getSchema()); } public KuduSink<OUT> withEventualConsistency() { @@ -76,29 +79,26 @@ public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> { @Override public void open(Configuration parameters) throws IOException { - startTableContext(); + if (connector != null) return; + connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); + serializer.withSchema(tableInfo.getSchema()); } - private void startTableContext() throws IOException { - if (tableContext != null) return; - tableContext = new KuduConnector(kuduMasters, tableInfo); - } - - @Override - public void invoke(OUT kuduRow) throws Exception { - try { - tableContext.writeRow(kuduRow, consistency, writeMode); - } catch (Exception e) { - throw new IOException(e.getLocalizedMessage(), e); + public void invoke(OUT row) throws Exception { + KuduRow kuduRow = serializer.serialize(row); + boolean response = connector.writeRow(kuduRow); + + if(!response) { + throw new IOException("error with some transaction"); } } @Override public void close() throws Exception { - if (this.tableContext == null) return; + if (this.connector == null) return; try { - this.tableContext.close(); + this.connector.close(); } catch (Exception e) { throw new IOException(e.getLocalizedMessage(), e); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java index 4dfc0b8..fa7472f 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java @@ -80,22 +80,72 @@ public class KuduColumnInfo implements Serializable { public static Builder create(String name, Type type) { return new Builder(name, type); } + public static Builder createByte(String name) { + return create(name, Type.INT8); + } + public static Builder createShort(String name) { + return create(name, Type.INT16); + } + public static Builder createInteger(String name) { + return create(name, Type.INT32); + } + public static Builder createLong(String name) { + return create(name, Type.INT64); + } + public static Builder createDouble(String name) { + return create(name, Type.DOUBLE); + } + public static Builder createFloat(String name) { + return create(name, Type.FLOAT); + } + public static Builder createString(String name) { + return create(name, Type.STRING); + } + public static Builder createBool(String name) { + return create(name, Type.BOOL); + } + public static Builder createByteArray(String name) { + return create(name, Type.BINARY); + } + public static Builder createUnixTime(String name) { + return create(name, Type.UNIXTIME_MICROS); + } + + public Builder asKey() { + return key(true); + } public Builder key(boolean key) { this.column.key = key; return this; } + public Builder asRangeKey() { + return rangeKey(true); + } + public Builder rangeKey(boolean rangeKey) { this.column.rangeKey = rangeKey; return this; } + public Builder asHashKey() { + return hashKey(true); + } + public Builder hashKey(boolean hashKey) { this.column.hashKey = hashKey; return this; } + public Builder asNullable() { + return nullable(true); + } + + public Builder asNotNullable() { + return nullable(false); + } + public Builder nullable(boolean nullable) { this.column.nullable = nullable; return this; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java index 0e2e6bc..a3851c4 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java @@ -17,27 +17,46 @@ package org.apache.flink.streaming.connectors.kudu.connector; import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.time.Time; import org.apache.kudu.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class KuduConnector implements AutoCloseable { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); + private Callback<Boolean, OperationResponse> defaultCB; + public enum Consistency {EVENTUAL, STRONG}; public enum WriteMode {INSERT,UPDATE,UPSERT} private AsyncKuduClient client; private KuduTable table; + private Consistency consistency; + private WriteMode writeMode; + + private static AtomicInteger pendingTransactions = new AtomicInteger(); + private static AtomicBoolean errorTransactions = new AtomicBoolean(false); + public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException { - client = client(kuduMasters); - table = table(tableInfo); + this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT); + } + + public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode) throws IOException { + this.client = client(kuduMasters); + this.table = table(tableInfo); + this.consistency = consistency; + this.writeMode = writeMode; + this.defaultCB = new ResponseCallback(); } private AsyncKuduClient client(String kuduMasters) { @@ -63,8 +82,8 @@ public class KuduConnector implements AutoCloseable { return true; } - public KuduScanner scanner(byte[] token) throws IOException { - return KuduScanToken.deserializeIntoScanner(token, client.syncClient()); + public KuduRowIterator scanner(byte[] token) throws IOException { + return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, client.syncClient())); } public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) { @@ -82,52 +101,60 @@ public class KuduConnector implements AutoCloseable { if (rowLimit !=null && rowLimit > 0) { tokenBuilder.limit(rowLimit); - // FIXME: https://issues.apache.org/jira/browse/KUDU-16 - // Server side limit() operator for java-based scanners are not implemented yet } return tokenBuilder.build(); } - public boolean writeRow(KuduRow row, Consistency consistency, WriteMode writeMode) throws Exception { + public boolean writeRow(KuduRow row) throws Exception { final Operation operation = KuduMapper.toOperation(table, writeMode, row); - if (Consistency.EVENTUAL.equals(consistency)) { - AsyncKuduSession session = client.newSession(); - session.apply(operation); - session.flush(); - return session.close().addCallback(new ResponseCallback()).join(); + AsyncKuduSession session = client.newSession(); + Deferred<OperationResponse> response = session.apply(operation); + + if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) { + pendingTransactions.incrementAndGet(); + response.addCallback(defaultCB); } else { - KuduSession session = client.syncClient().newSession(); - session.apply(operation); - session.flush(); - return processResponse(session.close()); + processResponse(response.join()); } + + session.close(); + return !errorTransactions.get(); + } @Override public void close() throws Exception { - if (client == null) return; + while(pendingTransactions.get() > 0) { + LOG.info("sleeping {}s by pending transactions", pendingTransactions.get()); + Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds()); + } + if (client == null) return; client.close(); } - private Boolean processResponse(List<OperationResponse> operationResponses) { - Boolean isOk = operationResponses.isEmpty(); - for(OperationResponse operationResponse : operationResponses) { + private class ResponseCallback implements Callback<Boolean, OperationResponse> { + @Override + public Boolean call(OperationResponse operationResponse) { + pendingTransactions.decrementAndGet(); + processResponse(operationResponse); + return errorTransactions.get(); + } + } + + protected void processResponse(OperationResponse operationResponse) { + if (operationResponse == null) return; + + if (operationResponse.hasRowError()) { logResponseError(operationResponse.getRowError()); + errorTransactions.set(true); } - return isOk; } private void logResponseError(RowError error) { LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString()); } - private class ResponseCallback implements Callback<Boolean, List<OperationResponse>> { - @Override - public Boolean call(List<OperationResponse> operationResponses) { - return processResponse(operationResponses); - } - } } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java index b1366ba..86b683f 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.kudu.connector; -import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.KuduTable; @@ -25,75 +24,73 @@ import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowResult; -import java.util.List; - -public final class KuduMapper { +final class KuduMapper { private KuduMapper() { } - public static KuduRow toKuduRow(RowResult row) { + static KuduRow toKuduRow(RowResult row) { Schema schema = row.getColumnProjection(); - List<ColumnSchema> columns = schema.getColumns(); - KuduRow values = new KuduRow(columns.size()); - for (int i = 0; i < columns.size(); i++) { - String name = schema.getColumnByIndex(i).getName(); - if(row.isNull(i)) { - values.setField(i, name, null); + KuduRow values = new KuduRow(schema.getColumnCount()); + schema.getColumns().forEach(column -> { + String name = column.getName(); + int pos = schema.getColumnIndex(name); + if(row.isNull(name)) { + values.setField(pos, name, null); } else { - Type type = schema.getColumnByIndex(i).getType(); + Type type = column.getType(); switch (type) { case BINARY: - values.setField(i, name, row.getBinary(i)); + values.setField(pos, name, row.getBinary(name)); break; case STRING: - values.setField(i, name, row.getString(i)); + values.setField(pos, name, row.getString(name)); break; case BOOL: - values.setField(i, name, row.getBoolean(i)); + values.setField(pos, name, row.getBoolean(name)); break; case DOUBLE: - values.setField(i, name, row.getDouble(i)); + values.setField(pos, name, row.getDouble(name)); break; case FLOAT: - values.setField(i, name, row.getFloat(i)); + values.setField(pos, name, row.getFloat(name)); break; case INT8: - values.setField(i, name, row.getByte(i)); + values.setField(pos, name, row.getByte(name)); break; case INT16: - values.setField(i, name, row.getShort(i)); + values.setField(pos, name, row.getShort(name)); break; case INT32: - values.setField(i, name, row.getInt(i)); + values.setField(pos, name, row.getInt(name)); break; case INT64: + values.setField(pos, name, row.getLong(name)); + break; case UNIXTIME_MICROS: - values.setField(i, name, row.getLong(i)); + values.setField(pos, name, row.getLong(name) / 1000); break; default: throw new IllegalArgumentException("Illegal var type: " + type); } } - } + }); return values; } - public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) { + static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) { final Operation operation = toOperation(table, writeMode); final PartialRow partialRow = operation.getRow(); - Schema schema = table.getSchema(); - List<ColumnSchema> columns = schema.getColumns(); + table.getSchema().getColumns().forEach(column -> { + String columnName = column.getName(); + Object value = row.getField(column.getName()); - for (int i = 0; i < columns.size(); i++) { - String columnName = schema.getColumnByIndex(i).getName(); - Object value = row.getField(i); if (value == null) { partialRow.setNull(columnName); } else { - Type type = schema.getColumnByIndex(i).getType(); + Type type = column.getType(); switch (type) { case STRING: partialRow.addString(columnName, (String) value); @@ -130,11 +127,11 @@ public final class KuduMapper { throw new IllegalArgumentException("Illegal var type: " + type); } } - } + }); return operation; } - public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) { + static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) { switch (writeMode) { case INSERT: return table.newInsert(); case UPDATE: return table.newUpdate(); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java index 03f5e5c..3c57a1b 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java @@ -17,11 +17,13 @@ package org.apache.flink.streaming.connectors.kudu.connector; import org.apache.flink.types.Row; -import org.apache.kudu.Schema; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.*; +import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.stream.Stream; public class KuduRow extends Row { @@ -33,24 +35,6 @@ public class KuduRow extends Row { rowNames = new LinkedHashMap<>(); } - public KuduRow(Object object, Schema schema) { - super(validFields(object)); - for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) { - basicValidation(c.getDeclaredFields()) - .filter(field -> schema.getColumn(field.getName()) != null) - .forEach(cField -> { - try { - cField.setAccessible(true); - setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object)); - } catch (IllegalAccessException e) { - String error = String.format("Cannot get value for %s", cField.getName()); - throw new IllegalArgumentException(error, e); - } - }); - } - } - - public Object getField(String name) { return super.getField(rowNames.get(name)); } @@ -60,6 +44,10 @@ public class KuduRow extends Row { this.rowNames.put(name, pos); } + public boolean isNull(String name) { + return isNull(rowNames.get(name)); + } + public boolean isNull(int pos) { return getField(pos) == null; } @@ -86,50 +74,6 @@ public class KuduRow extends Row { return toRet; } - public <P> P blind(Class<P> clazz) { - P o = createInstance(clazz); - - for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { - Field[] fields = c.getDeclaredFields(); - for (Field cField : fields) { - try { - if(rowNames.containsKey(cField.getName()) - && !Modifier.isStatic(cField.getModifiers()) - && !Modifier.isTransient(cField.getModifiers())) { - - cField.setAccessible(true); - Object value = getField(cField.getName()); - if (value != null) { - if (cField.getType() == value.getClass()) { - cField.set(o, value); - } else if (cField.getType() == Long.class && value.getClass() == Date.class) { - cField.set(o, ((Date) value).getTime()); - } else { - cField.set(o, value); - } - } - } - } catch (IllegalAccessException e) { - String error = String.format("Cannot get value for %s", cField.getName()); - throw new IllegalArgumentException(error, e); - } - } - } - - return o; - - } - - - private <P> P createInstance(Class<P> clazz) { - try { - return clazz.getConstructor().newInstance(); - } catch (ReflectiveOperationException e) { - String error = String.format("Cannot create instance for %s", clazz.getSimpleName()); - throw new IllegalArgumentException(error, e); - } - } - @Override public String toString() { return blindMap().toString(); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java new file mode 100644 index 0000000..46cbff1 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java @@ -0,0 +1,57 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; + +public class KuduRowIterator { + + private KuduScanner scanner; + private RowResultIterator rowIterator; + + public KuduRowIterator(KuduScanner scanner) throws KuduException { + this.scanner = scanner; + nextRows(); + } + + public void close() throws KuduException { + scanner.close(); + } + + public boolean hasNext() throws KuduException { + if (rowIterator.hasNext()) { + return true; + } else if (scanner.hasMoreRows()) { + nextRows(); + return true; + } else { + return false; + } + } + + public KuduRow next() { + RowResult row = this.rowIterator.next(); + return KuduMapper.toKuduRow(row); + } + + private void nextRows() throws KuduException { + this.rowIterator = scanner.nextRows(); + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java new file mode 100644 index 0000000..c12eb42 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java @@ -0,0 +1,39 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.serde; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.kudu.Schema; + +public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> { + + @Override + public KuduRow deserialize(KuduRow row) { + return row; + } + + @Override + public KuduRow serialize(KuduRow value) { + return value; + } + + @Override + public DefaultSerDe withSchema(Schema schema) { + return this; + } + +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java new file mode 100644 index 0000000..355a516 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java @@ -0,0 +1,25 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.serde; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; + +import java.io.Serializable; + +public interface KuduDeserialization<T> extends Serializable { + T deserialize(KuduRow row); +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java new file mode 100644 index 0000000..99db1dc --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java @@ -0,0 +1,28 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.serde; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.kudu.Schema; + +import java.io.Serializable; + +public interface KuduSerialization<T> extends Serializable { + KuduRow serialize(T value); + + KuduSerialization<T> withSchema(Schema schema); +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java new file mode 100644 index 0000000..1063aa2 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java @@ -0,0 +1,134 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.serde; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.kudu.Schema; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Date; +import java.util.stream.Stream; + +public class PojoSerDe<P> implements KuduSerialization<P>, KuduDeserialization<P> { + + + private Class<P> clazz; + + public transient KuduTableInfo tableInfo; + public transient Schema schema; + + + public PojoSerDe(Class<P> clazz) { + this.clazz = clazz; + } + + @Override + public PojoSerDe<P> withSchema(Schema schema) { + this.schema = schema; + return this; + } + + @Override + public KuduRow serialize(P object) { + return mapTo(object); + } + + private KuduRow mapTo(P object) { + if (schema == null) throw new IllegalArgumentException("schema must be set to serialize"); + + KuduRow row = new KuduRow(schema.getRowSize()); + + for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) { + basicValidation(c.getDeclaredFields()) + .forEach(cField -> { + try { + cField.setAccessible(true); + row.setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object)); + } catch (IllegalAccessException e) { + String error = String.format("Cannot get value for %s", cField.getName()); + throw new IllegalArgumentException(error, e); + } + }); + } + + return row; + } + + private Stream<Field> basicValidation(Field[] fields) { + return Arrays.stream(fields) + .filter(field -> schemaHasColumn(field.getName())) + .filter(field -> !Modifier.isStatic(field.getModifiers())) + .filter(field -> !Modifier.isTransient(field.getModifiers())); + } + + private boolean schemaHasColumn(String field) { + return schema.getColumns().stream().anyMatch(col -> StringUtils.equalsIgnoreCase(col.getName(),field)); + } + + @Override + public P deserialize(KuduRow row) { + return mapFrom(row); + } + + private P mapFrom(KuduRow row) { + P o = createInstance(clazz); + + for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + + basicValidation(fields) + .forEach(cField -> { + try { + cField.setAccessible(true); + Object value = row.getField(cField.getName()); + if (value != null) { + if (cField.getType() == value.getClass()) { + cField.set(o, value); + } else if (cField.getType() == Long.class && value.getClass() == Date.class) { + cField.set(o, ((Date) value).getTime()); + } else { + cField.set(o, value); + } + } + } catch (IllegalAccessException e) { + String error = String.format("Cannot get value for %s", cField.getName()); + throw new IllegalArgumentException(error, e); + } + }); + } + + return o; + + } + + private P createInstance(Class<P> clazz) { + try { + Constructor<P> constructor = clazz.getDeclaredConstructor(); + constructor.setAccessible(true); + return constructor.newInstance(); + } catch (ReflectiveOperationException e) { + String error = String.format("Cannot create instance for %s", clazz.getSimpleName()); + throw new IllegalArgumentException(error, e); + } + } + +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java index e282185..35982f4 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kudu; import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -32,18 +33,18 @@ public class KuduOuputFormatTest extends KuduDatabase { @Test public void testInvalidKuduMaster() throws IOException { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo)); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe())); } @Test public void testInvalidTableInfo() throws IOException { - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null)); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe())); } @Test public void testNotTableExist() throws IOException { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo); + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()); Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1)); } @@ -51,7 +52,7 @@ public class KuduOuputFormatTest extends KuduDatabase { public void testOutputWithStrongConsistency() throws Exception { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo) + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()) .withStrongConsistency(); outputFormat.open(0,1); @@ -70,7 +71,7 @@ public class KuduOuputFormatTest extends KuduDatabase { public void testOutputWithEventualConsistency() throws Exception { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo) + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()) .withEventualConsistency(); outputFormat.open(0,1); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java index a89580f..3ca9b9a 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -33,18 +34,18 @@ public class KuduSinkTest extends KuduDatabase { @Test public void testInvalidKuduMaster() throws IOException { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo)); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe())); } @Test public void testInvalidTableInfo() throws IOException { - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null)); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe())); } @Test public void testNotTableExist() throws IOException { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo); + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()); Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); } @@ -52,7 +53,7 @@ public class KuduSinkTest extends KuduDatabase { public void testOutputWithStrongConsistency() throws Exception { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo) + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()) .withStrongConsistency(); sink.open(new Configuration()); @@ -69,7 +70,7 @@ public class KuduSinkTest extends KuduDatabase { @Test public void testOutputWithEventualConsistency() throws Exception { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo) + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()) .withEventualConsistency(); sink.open(new Configuration()); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java index 41e59b7..99efbd1 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java @@ -67,7 +67,7 @@ public class KuduDatabase { KuduConnector tableContext = new KuduConnector(hostsCluster, tableInfo); booksDataRow().forEach(row -> { try { - tableContext.writeRow(row, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT); + tableContext.writeRow(row); }catch (Exception e) { e.printStackTrace(); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java new file mode 100644 index 0000000..afe57ca --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java @@ -0,0 +1,57 @@ +/* + * Licensed serialize the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file serialize You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed serialize in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.serde; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.kudu.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PojoSerDeTest { + + public class TestPojo { + private String field1; + private String field2; + private String field3; + + public TestPojo() { + field1 = "field1"; + field2 = "field2"; + field3 = "field3"; + } + } + + @Test + public void testFieldsNotInSchema() { + + TestPojo pojo = new TestPojo(); + + KuduTableInfo tableInfo = KuduTableInfo.Builder.create("test") + .addColumn(KuduColumnInfo.Builder.create("field1", Type.STRING).key(true).hashKey(true).build()) + .addColumn(KuduColumnInfo.Builder.create("field2", Type.STRING).build()) + .build(); + + KuduRow row = new PojoSerDe<>(TestPojo.class).withSchema(tableInfo.getSchema()).serialize(pojo); + + Assertions.assertEquals(2, row.blindMap().size()); + Assertions.assertEquals("field1", row.getField("field1")); + Assertions.assertEquals("field2", row.getField("field2")); + + } +}