This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 247dfe99486f04622b4ee7d83fa51ef253aaa75d
Author: eskabetxe <b...@boto.pro>
AuthorDate: Tue Jan 15 13:05:29 2019 +0100

    [BAHIR-180] Improve eventual consistence for Kudu connector
    
    Closes #40
---
 flink-connector-kudu/pom.xml                       |   2 +-
 .../streaming/connectors/kudu/KuduInputFormat.java |  27 ++---
 .../connectors/kudu/KuduOutputFormat.java          |  36 +++---
 .../flink/streaming/connectors/kudu/KuduSink.java  |  34 +++---
 .../connectors/kudu/connector/KuduColumnInfo.java  |  50 ++++++++
 .../connectors/kudu/connector/KuduConnector.java   |  81 ++++++++-----
 .../connectors/kudu/connector/KuduMapper.java      |  59 +++++----
 .../connectors/kudu/connector/KuduRow.java         |  72 ++---------
 .../connectors/kudu/connector/KuduRowIterator.java |  57 +++++++++
 .../connectors/kudu/serde/DefaultSerDe.java        |  39 ++++++
 .../connectors/kudu/serde/KuduDeserialization.java |  25 ++++
 .../connectors/kudu/serde/KuduSerialization.java   |  28 +++++
 .../streaming/connectors/kudu/serde/PojoSerDe.java | 134 +++++++++++++++++++++
 .../connectors/kudu/KuduOuputFormatTest.java       |  11 +-
 .../streaming/connectors/kudu/KuduSinkTest.java    |  11 +-
 .../connectors/kudu/connector/KuduDatabase.java    |   2 +-
 .../connectors/kudu/serde/PojoSerDeTest.java       |  57 +++++++++
 17 files changed, 543 insertions(+), 182 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 5540fc1..61ab4a6 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <kudu.version>1.8.0</kudu.version>
+    <kudu.version>1.7.1</kudu.version>
 
     <junit.groups>!DockerTest</junit.groups>
   </properties>
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
index 617e317..fd126d0 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
@@ -24,7 +24,9 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.streaming.connectors.kudu.connector.*;
 import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.*;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.LocatedTablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,8 +45,7 @@ public class KuduInputFormat extends RichInputFormat<KuduRow, 
KuduInputFormat.Ku
     private boolean endReached;
 
     private transient KuduConnector tableContext;
-    private transient KuduScanner scanner;
-    private transient RowResultIterator resultIterator;
+    private transient KuduRowIterator resultIterator;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KuduInputFormat.class);
 
@@ -90,15 +91,14 @@ public class KuduInputFormat extends 
RichInputFormat<KuduRow, KuduInputFormat.Ku
         endReached = false;
         startTableContext();
 
-        scanner = tableContext.scanner(split.getScanToken());
-        resultIterator = scanner.nextRows();
+        resultIterator = tableContext.scanner(split.getScanToken());
     }
 
     @Override
     public void close() {
-        if (scanner != null) {
+        if (resultIterator != null) {
             try {
-                scanner.close();
+                resultIterator.close();
             } catch (KuduException e) {
                 e.printStackTrace();
             }
@@ -168,18 +168,11 @@ public class KuduInputFormat extends 
RichInputFormat<KuduRow, KuduInputFormat.Ku
     public KuduRow nextRecord(KuduRow reuse) throws IOException {
         // check that current iterator has next rows
         if (this.resultIterator.hasNext()) {
-            RowResult row = this.resultIterator.next();
-            return KuduMapper.toKuduRow(row);
-        }
-        // if not, check that current scanner has more iterators
-        else if (scanner.hasMoreRows()) {
-            this.resultIterator = scanner.nextRows();
-            return nextRecord(reuse);
-        }
-        else {
+            return resultIterator.next();
+        } else {
             endReached = true;
+            return null;
         }
-        return null;
     }
 
     /**
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
index 5c23f36..9d12710 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -16,18 +16,19 @@
  */
 package org.apache.flink.streaming.connectors.kudu;
 
-import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class KuduOutputFormat<OUT extends KuduRow> implements 
OutputFormat<OUT> {
+public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KuduOutputFormat.class);
 
@@ -36,10 +37,12 @@ public class KuduOutputFormat<OUT extends KuduRow> 
implements OutputFormat<OUT>
     private KuduConnector.Consistency consistency;
     private KuduConnector.WriteMode writeMode;
 
-    private transient KuduConnector tableContext;
+    private KuduSerialization<OUT> serializer;
 
+    private transient KuduConnector connector;
 
-    public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+
+    public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, 
KuduSerialization<OUT> serializer) {
         Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be 
null");
         this.kuduMasters = kuduMasters;
 
@@ -47,8 +50,10 @@ public class KuduOutputFormat<OUT extends KuduRow> 
implements OutputFormat<OUT>
         this.tableInfo = tableInfo;
         this.consistency = KuduConnector.Consistency.STRONG;
         this.writeMode = KuduConnector.WriteMode.UPSERT;
+        this.serializer = serializer.withSchema(tableInfo.getSchema());
     }
 
+
     public KuduOutputFormat<OUT> withEventualConsistency() {
         this.consistency = KuduConnector.Consistency.EVENTUAL;
         return this;
@@ -81,28 +86,31 @@ public class KuduOutputFormat<OUT extends KuduRow> 
implements OutputFormat<OUT>
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        startTableContext();
-    }
-
-    private void startTableContext() throws IOException {
-        if (tableContext != null) return;
-        tableContext = new KuduConnector(kuduMasters, tableInfo);
+        if (connector != null) return;
+        connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode);
+        serializer = serializer.withSchema(tableInfo.getSchema());
     }
 
     @Override
-    public void writeRecord(OUT kuduRow) throws IOException {
+    public void writeRecord(OUT row) throws IOException {
+        boolean response;
         try {
-            tableContext.writeRow(kuduRow, consistency, writeMode);
+            KuduRow kuduRow = serializer.serialize(row);
+            response = connector.writeRow(kuduRow);
         } catch (Exception e) {
             throw new IOException(e.getLocalizedMessage(), e);
         }
+
+        if(!response) {
+            throw new IOException("error with some transaction");
+        }
     }
 
     @Override
     public void close() throws IOException {
-        if (this.tableContext == null) return;
+        if (this.connector == null) return;
         try {
-            this.tableContext.close();
+            this.connector.close();
         } catch (Exception e) {
             throw new IOException(e.getLocalizedMessage(), e);
         }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
index 120d5c5..53cf249 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -21,13 +21,14 @@ import 
org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> {
+public class KuduSink<OUT> extends RichSinkFunction<OUT> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KuduOutputFormat.class);
 
@@ -36,10 +37,11 @@ public class KuduSink<OUT extends KuduRow> extends 
RichSinkFunction<OUT> {
     private KuduConnector.Consistency consistency;
     private KuduConnector.WriteMode writeMode;
 
-    private transient KuduConnector tableContext;
+    private KuduSerialization<OUT> serializer;
 
+    private transient KuduConnector connector;
 
-    public KuduSink(String kuduMasters, KuduTableInfo tableInfo) {
+    public KuduSink(String kuduMasters, KuduTableInfo tableInfo, 
KuduSerialization<OUT> serializer) {
         Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be 
null");
         this.kuduMasters = kuduMasters;
 
@@ -47,6 +49,7 @@ public class KuduSink<OUT extends KuduRow> extends 
RichSinkFunction<OUT> {
         this.tableInfo = tableInfo;
         this.consistency = KuduConnector.Consistency.STRONG;
         this.writeMode = KuduConnector.WriteMode.UPSERT;
+        this.serializer = serializer.withSchema(tableInfo.getSchema());
     }
 
     public KuduSink<OUT> withEventualConsistency() {
@@ -76,29 +79,26 @@ public class KuduSink<OUT extends KuduRow> extends 
RichSinkFunction<OUT> {
 
     @Override
     public void open(Configuration parameters) throws IOException {
-        startTableContext();
+        if (connector != null) return;
+        connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode);
+        serializer.withSchema(tableInfo.getSchema());
     }
 
-    private void startTableContext() throws IOException {
-        if (tableContext != null) return;
-        tableContext = new KuduConnector(kuduMasters, tableInfo);
-    }
-
-
     @Override
-    public void invoke(OUT kuduRow) throws Exception {
-        try {
-            tableContext.writeRow(kuduRow, consistency, writeMode);
-        } catch (Exception e) {
-            throw new IOException(e.getLocalizedMessage(), e);
+    public void invoke(OUT row) throws Exception {
+        KuduRow kuduRow = serializer.serialize(row);
+        boolean response = connector.writeRow(kuduRow);
+
+        if(!response) {
+            throw new IOException("error with some transaction");
         }
     }
 
     @Override
     public void close() throws Exception {
-        if (this.tableContext == null) return;
+        if (this.connector == null) return;
         try {
-            this.tableContext.close();
+            this.connector.close();
         } catch (Exception e) {
             throw new IOException(e.getLocalizedMessage(), e);
         }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
index 4dfc0b8..fa7472f 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
@@ -80,22 +80,72 @@ public class KuduColumnInfo implements Serializable {
         public static Builder create(String name, Type type) {
             return new Builder(name, type);
         }
+        public static Builder createByte(String name) {
+            return create(name, Type.INT8);
+        }
+        public static Builder createShort(String name) {
+            return create(name, Type.INT16);
+        }
+        public static Builder createInteger(String name) {
+            return create(name, Type.INT32);
+        }
+        public static Builder createLong(String name) {
+            return create(name, Type.INT64);
+        }
+        public static Builder createDouble(String name) {
+            return create(name, Type.DOUBLE);
+        }
+        public static Builder createFloat(String name) {
+            return create(name, Type.FLOAT);
+        }
+        public static Builder createString(String name) {
+            return create(name, Type.STRING);
+        }
+        public static Builder createBool(String name) {
+            return create(name, Type.BOOL);
+        }
+        public static Builder createByteArray(String name) {
+            return create(name, Type.BINARY);
+        }
+        public static Builder createUnixTime(String name) {
+            return create(name, Type.UNIXTIME_MICROS);
+        }
+
+        public Builder asKey() {
+            return key(true);
+        }
 
         public Builder key(boolean key) {
             this.column.key = key;
             return this;
         }
 
+        public Builder asRangeKey() {
+            return rangeKey(true);
+        }
+
         public Builder rangeKey(boolean rangeKey) {
             this.column.rangeKey = rangeKey;
             return this;
         }
 
+        public Builder asHashKey() {
+            return hashKey(true);
+        }
+
         public Builder hashKey(boolean hashKey) {
             this.column.hashKey = hashKey;
             return this;
         }
 
+        public Builder asNullable() {
+            return nullable(true);
+        }
+
+        public Builder asNotNullable() {
+            return nullable(false);
+        }
+
         public Builder nullable(boolean nullable) {
             this.column.nullable = nullable;
             return this;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
index 0e2e6bc..a3851c4 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -17,27 +17,46 @@
 package org.apache.flink.streaming.connectors.kudu.connector;
 
 import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.time.Time;
 import org.apache.kudu.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class KuduConnector implements AutoCloseable {
 
     private final Logger LOG = LoggerFactory.getLogger(this.getClass());
 
+    private Callback<Boolean, OperationResponse> defaultCB;
+
     public enum Consistency {EVENTUAL, STRONG};
     public enum WriteMode {INSERT,UPDATE,UPSERT}
 
     private AsyncKuduClient client;
     private KuduTable table;
 
+    private Consistency consistency;
+    private WriteMode writeMode;
+
+    private static AtomicInteger pendingTransactions = new AtomicInteger();
+    private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
+
     public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws 
IOException {
-        client = client(kuduMasters);
-        table = table(tableInfo);
+        this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, 
KuduConnector.WriteMode.UPSERT);
+    }
+
+    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, 
Consistency consistency, WriteMode writeMode) throws IOException {
+        this.client = client(kuduMasters);
+        this.table = table(tableInfo);
+        this.consistency = consistency;
+        this.writeMode = writeMode;
+        this.defaultCB = new ResponseCallback();
     }
 
     private AsyncKuduClient client(String kuduMasters) {
@@ -63,8 +82,8 @@ public class KuduConnector implements AutoCloseable {
         return true;
     }
 
-    public KuduScanner scanner(byte[] token) throws IOException {
-        return KuduScanToken.deserializeIntoScanner(token, 
client.syncClient());
+    public KuduRowIterator scanner(byte[] token) throws IOException {
+        return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, 
client.syncClient()));
     }
 
     public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, 
List<String> tableProjections, Long rowLimit) {
@@ -82,52 +101,60 @@ public class KuduConnector implements AutoCloseable {
 
         if (rowLimit !=null && rowLimit > 0) {
             tokenBuilder.limit(rowLimit);
-            // FIXME: https://issues.apache.org/jira/browse/KUDU-16
-            // Server side limit() operator for java-based scanners are not 
implemented yet
         }
 
         return tokenBuilder.build();
     }
 
-    public boolean writeRow(KuduRow row, Consistency consistency, WriteMode 
writeMode) throws Exception {
+    public boolean writeRow(KuduRow row) throws Exception {
         final Operation operation = KuduMapper.toOperation(table, writeMode, 
row);
 
-        if (Consistency.EVENTUAL.equals(consistency)) {
-            AsyncKuduSession session = client.newSession();
-            session.apply(operation);
-            session.flush();
-            return session.close().addCallback(new ResponseCallback()).join();
+        AsyncKuduSession session = client.newSession();
+        Deferred<OperationResponse> response = session.apply(operation);
+
+        if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
+            pendingTransactions.incrementAndGet();
+            response.addCallback(defaultCB);
         } else {
-            KuduSession session = client.syncClient().newSession();
-            session.apply(operation);
-            session.flush();
-            return processResponse(session.close());
+            processResponse(response.join());
         }
+
+        session.close();
+        return !errorTransactions.get();
+
     }
 
     @Override
     public void close() throws Exception {
-        if (client == null) return;
+        while(pendingTransactions.get() > 0) {
+            LOG.info("sleeping {}s by pending transactions", 
pendingTransactions.get());
+            
Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
+        }
 
+        if (client == null) return;
         client.close();
     }
 
-    private Boolean processResponse(List<OperationResponse> 
operationResponses) {
-        Boolean isOk = operationResponses.isEmpty();
-        for(OperationResponse operationResponse : operationResponses) {
+    private class ResponseCallback implements Callback<Boolean, 
OperationResponse> {
+        @Override
+        public Boolean call(OperationResponse operationResponse) {
+            pendingTransactions.decrementAndGet();
+            processResponse(operationResponse);
+            return errorTransactions.get();
+        }
+    }
+
+    protected void processResponse(OperationResponse operationResponse) {
+        if (operationResponse == null) return;
+
+        if (operationResponse.hasRowError()) {
             logResponseError(operationResponse.getRowError());
+            errorTransactions.set(true);
         }
-        return isOk;
     }
 
     private void logResponseError(RowError error) {
         LOG.error("Error {} on {}: {} ", error.getErrorStatus(), 
error.getOperation(), error.toString());
     }
 
-    private class ResponseCallback implements Callback<Boolean, 
List<OperationResponse>> {
-        @Override
-        public Boolean call(List<OperationResponse> operationResponses) {
-            return processResponse(operationResponses);
-        }
-    }
 }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
index b1366ba..86b683f 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
@@ -17,7 +17,6 @@
 package org.apache.flink.streaming.connectors.kudu.connector;
 
 
-import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduTable;
@@ -25,75 +24,73 @@ import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 
-import java.util.List;
-
-public final class KuduMapper {
+final class KuduMapper {
 
     private KuduMapper() { }
 
-    public static KuduRow toKuduRow(RowResult row) {
+    static KuduRow toKuduRow(RowResult row) {
         Schema schema = row.getColumnProjection();
-        List<ColumnSchema> columns = schema.getColumns();
 
-        KuduRow values = new KuduRow(columns.size());
-        for (int i = 0; i < columns.size(); i++) {
-            String name = schema.getColumnByIndex(i).getName();
-            if(row.isNull(i)) {
-                values.setField(i, name, null);
+        KuduRow values = new KuduRow(schema.getColumnCount());
+        schema.getColumns().forEach(column -> {
+            String name = column.getName();
+            int pos = schema.getColumnIndex(name);
+            if(row.isNull(name)) {
+                values.setField(pos, name, null);
             } else {
-                Type type = schema.getColumnByIndex(i).getType();
+                Type type = column.getType();
                 switch (type) {
                     case BINARY:
-                        values.setField(i, name, row.getBinary(i));
+                        values.setField(pos, name, row.getBinary(name));
                         break;
                     case STRING:
-                        values.setField(i, name, row.getString(i));
+                        values.setField(pos, name, row.getString(name));
                         break;
                     case BOOL:
-                        values.setField(i, name, row.getBoolean(i));
+                        values.setField(pos, name, row.getBoolean(name));
                         break;
                     case DOUBLE:
-                        values.setField(i, name, row.getDouble(i));
+                        values.setField(pos, name, row.getDouble(name));
                         break;
                     case FLOAT:
-                        values.setField(i, name, row.getFloat(i));
+                        values.setField(pos, name, row.getFloat(name));
                         break;
                     case INT8:
-                        values.setField(i, name, row.getByte(i));
+                        values.setField(pos, name, row.getByte(name));
                         break;
                     case INT16:
-                        values.setField(i, name, row.getShort(i));
+                        values.setField(pos, name, row.getShort(name));
                         break;
                     case INT32:
-                        values.setField(i, name, row.getInt(i));
+                        values.setField(pos, name, row.getInt(name));
                         break;
                     case INT64:
+                        values.setField(pos, name, row.getLong(name));
+                        break;
                     case UNIXTIME_MICROS:
-                        values.setField(i, name, row.getLong(i));
+                        values.setField(pos, name, row.getLong(name) / 1000);
                         break;
                     default:
                         throw new IllegalArgumentException("Illegal var type: 
" + type);
                 }
             }
-        }
+        });
         return values;
     }
 
 
-    public static Operation toOperation(KuduTable table, 
KuduConnector.WriteMode writeMode, KuduRow row) {
+    static Operation toOperation(KuduTable table, KuduConnector.WriteMode 
writeMode, KuduRow row) {
         final Operation operation = toOperation(table, writeMode);
         final PartialRow partialRow = operation.getRow();
 
-        Schema schema = table.getSchema();
-        List<ColumnSchema> columns = schema.getColumns();
+        table.getSchema().getColumns().forEach(column -> {
+            String columnName = column.getName();
+            Object value = row.getField(column.getName());
 
-        for (int i = 0; i < columns.size(); i++) {
-            String columnName = schema.getColumnByIndex(i).getName();
-            Object value = row.getField(i);
             if (value == null) {
                 partialRow.setNull(columnName);
             } else {
-                Type type = schema.getColumnByIndex(i).getType();
+                Type type = column.getType();
                 switch (type) {
                     case STRING:
                         partialRow.addString(columnName, (String) value);
@@ -130,11 +127,11 @@ public final class KuduMapper {
                         throw new IllegalArgumentException("Illegal var type: 
" + type);
                 }
             }
-        }
+        });
         return operation;
     }
 
-    public static Operation toOperation(KuduTable table, 
KuduConnector.WriteMode writeMode) {
+    static Operation toOperation(KuduTable table, KuduConnector.WriteMode 
writeMode) {
         switch (writeMode) {
             case INSERT: return table.newInsert();
             case UPDATE: return table.newUpdate();
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
index 03f5e5c..3c57a1b 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
@@ -17,11 +17,13 @@
 package org.apache.flink.streaming.connectors.kudu.connector;
 
 import org.apache.flink.types.Row;
-import org.apache.kudu.Schema;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 public class KuduRow extends Row {
@@ -33,24 +35,6 @@ public class KuduRow extends Row {
         rowNames = new LinkedHashMap<>();
     }
 
-    public KuduRow(Object object, Schema schema) {
-        super(validFields(object));
-        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) 
{
-            basicValidation(c.getDeclaredFields())
-                    .filter(field -> schema.getColumn(field.getName()) != null)
-                    .forEach(cField -> {
-                        try {
-                            cField.setAccessible(true);
-                            setField(schema.getColumnIndex(cField.getName()), 
cField.getName(), cField.get(object));
-                        } catch (IllegalAccessException e) {
-                            String error = String.format("Cannot get value for 
%s", cField.getName());
-                            throw new IllegalArgumentException(error, e);
-                        }
-                    });
-        }
-    }
-
-
     public Object getField(String name) {
         return super.getField(rowNames.get(name));
     }
@@ -60,6 +44,10 @@ public class KuduRow extends Row {
         this.rowNames.put(name, pos);
     }
 
+    public boolean isNull(String name) {
+        return isNull(rowNames.get(name));
+    }
+
     public boolean isNull(int pos) {
         return getField(pos) == null;
     }
@@ -86,50 +74,6 @@ public class KuduRow extends Row {
         return  toRet;
     }
 
-    public <P> P blind(Class<P> clazz) {
-        P o = createInstance(clazz);
-
-        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
-            Field[] fields = c.getDeclaredFields();
-            for (Field cField : fields) {
-                try {
-                    if(rowNames.containsKey(cField.getName())
-                            && !Modifier.isStatic(cField.getModifiers())
-                            && !Modifier.isTransient(cField.getModifiers())) {
-
-                        cField.setAccessible(true);
-                        Object value = getField(cField.getName());
-                        if (value != null) {
-                            if (cField.getType() == value.getClass()) {
-                                cField.set(o, value);
-                            } else if (cField.getType() == Long.class && 
value.getClass() == Date.class) {
-                                cField.set(o, ((Date) value).getTime());
-                            } else {
-                                cField.set(o, value);
-                            }
-                        }
-                    }
-                } catch (IllegalAccessException e) {
-                    String error = String.format("Cannot get value for %s", 
cField.getName());
-                    throw new IllegalArgumentException(error, e);
-                }
-            }
-        }
-
-        return o;
-
-    }
-
-
-    private <P> P createInstance(Class<P> clazz) {
-        try {
-            return clazz.getConstructor().newInstance();
-        } catch (ReflectiveOperationException e) {
-            String error = String.format("Cannot create instance for %s", 
clazz.getSimpleName());
-            throw new IllegalArgumentException(error, e);
-        }
-    }
-
     @Override
     public String toString() {
         return blindMap().toString();
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
new file mode 100644
index 0000000..46cbff1
--- /dev/null
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+public class KuduRowIterator {
+
+    private KuduScanner scanner;
+    private RowResultIterator rowIterator;
+
+    public KuduRowIterator(KuduScanner scanner) throws KuduException {
+        this.scanner = scanner;
+        nextRows();
+    }
+
+    public void close() throws KuduException {
+        scanner.close();
+    }
+
+    public boolean hasNext() throws KuduException {
+        if (rowIterator.hasNext()) {
+            return true;
+        } else if (scanner.hasMoreRows()) {
+            nextRows();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public KuduRow next() {
+        RowResult row = this.rowIterator.next();
+        return KuduMapper.toKuduRow(row);
+    }
+
+    private void nextRows() throws KuduException {
+        this.rowIterator = scanner.nextRows();
+    }
+}
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
new file mode 100644
index 0000000..c12eb42
--- /dev/null
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+
+public class DefaultSerDe implements KuduSerialization<KuduRow>, 
KuduDeserialization<KuduRow> {
+
+    @Override
+    public KuduRow deserialize(KuduRow row) {
+        return row;
+    }
+
+    @Override
+    public KuduRow serialize(KuduRow value) {
+        return value;
+    }
+
+    @Override
+    public DefaultSerDe withSchema(Schema schema) {
+        return this;
+    }
+
+}
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
new file mode 100644
index 0000000..355a516
--- /dev/null
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+
+import java.io.Serializable;
+
+public interface KuduDeserialization<T> extends Serializable {
+    T deserialize(KuduRow row);
+}
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
new file mode 100644
index 0000000..99db1dc
--- /dev/null
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+
+import java.io.Serializable;
+
+public interface KuduSerialization<T> extends Serializable {
+    KuduRow serialize(T value);
+
+    KuduSerialization<T> withSchema(Schema schema);
+}
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
new file mode 100644
index 0000000..1063aa2
--- /dev/null
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.Schema;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.stream.Stream;
+
+public class PojoSerDe<P> implements KuduSerialization<P>, 
KuduDeserialization<P> {
+
+
+    private Class<P> clazz;
+
+    public transient KuduTableInfo tableInfo;
+    public transient Schema schema;
+
+
+    public PojoSerDe(Class<P> clazz) {
+        this.clazz = clazz;
+    }
+
+    @Override
+    public PojoSerDe<P> withSchema(Schema schema) {
+        this.schema = schema;
+        return this;
+    }
+
+    @Override
+    public KuduRow serialize(P object) {
+        return mapTo(object);
+    }
+
+    private KuduRow mapTo(P object) {
+        if (schema == null) throw new IllegalArgumentException("schema must be 
set to serialize");
+
+        KuduRow row = new KuduRow(schema.getRowSize());
+
+        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) 
{
+            basicValidation(c.getDeclaredFields())
+                    .forEach(cField -> {
+                        try {
+                            cField.setAccessible(true);
+                            
row.setField(schema.getColumnIndex(cField.getName()), cField.getName(), 
cField.get(object));
+                        } catch (IllegalAccessException e) {
+                            String error = String.format("Cannot get value for 
%s", cField.getName());
+                            throw new IllegalArgumentException(error, e);
+                        }
+                    });
+        }
+
+        return row;
+    }
+
+    private Stream<Field> basicValidation(Field[] fields) {
+        return Arrays.stream(fields)
+                .filter(field -> schemaHasColumn(field.getName()))
+                .filter(field -> !Modifier.isStatic(field.getModifiers()))
+                .filter(field -> !Modifier.isTransient(field.getModifiers()));
+    }
+
+    private boolean schemaHasColumn(String field) {
+        return schema.getColumns().stream().anyMatch(col -> 
StringUtils.equalsIgnoreCase(col.getName(),field));
+    }
+
+    @Override
+    public P deserialize(KuduRow row) {
+        return mapFrom(row);
+    }
+
+    private P mapFrom(KuduRow row) {
+        P o = createInstance(clazz);
+
+        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+
+            basicValidation(fields)
+                    .forEach(cField -> {
+                        try {
+                            cField.setAccessible(true);
+                            Object value = row.getField(cField.getName());
+                            if (value != null) {
+                                if (cField.getType() == value.getClass()) {
+                                    cField.set(o, value);
+                                } else if (cField.getType() == Long.class && 
value.getClass() == Date.class) {
+                                    cField.set(o, ((Date) value).getTime());
+                                } else {
+                                    cField.set(o, value);
+                                }
+                            }
+                        } catch (IllegalAccessException e) {
+                            String error = String.format("Cannot get value for 
%s", cField.getName());
+                            throw new IllegalArgumentException(error, e);
+                        }
+                    });
+        }
+
+        return o;
+
+    }
+
+    private P createInstance(Class<P> clazz) {
+        try {
+            Constructor<P> constructor = clazz.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            return constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            String error = String.format("Cannot create instance for %s", 
clazz.getSimpleName());
+            throw new IllegalArgumentException(error, e);
+        }
+    }
+
+}
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
index e282185..35982f4 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kudu;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -32,18 +33,18 @@ public class KuduOuputFormatTest extends KuduDatabase {
     @Test
     public void testInvalidKuduMaster() throws IOException {
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(null, tableInfo));
+        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
     }
 
     @Test
     public void testInvalidTableInfo() throws IOException {
-        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(hostsCluster, null));
+        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe()));
     }
 
     @Test
     public void testNotTableExist() throws IOException {
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo);
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo, new DefaultSerDe());
         Assertions.assertThrows(UnsupportedOperationException.class, () -> 
outputFormat.open(0,1));
     }
 
@@ -51,7 +52,7 @@ public class KuduOuputFormatTest extends KuduDatabase {
     public void testOutputWithStrongConsistency() throws Exception {
 
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo)
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo, new DefaultSerDe())
                 .withStrongConsistency();
         outputFormat.open(0,1);
 
@@ -70,7 +71,7 @@ public class KuduOuputFormatTest extends KuduDatabase {
     public void testOutputWithEventualConsistency() throws Exception {
 
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo)
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, 
tableInfo, new DefaultSerDe())
                 .withEventualConsistency();
         outputFormat.open(0,1);
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
index a89580f..3ca9b9a 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -33,18 +34,18 @@ public class KuduSinkTest extends KuduDatabase {
     @Test
     public void testInvalidKuduMaster() throws IOException {
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(null, tableInfo));
+        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
     }
 
     @Test
     public void testInvalidTableInfo() throws IOException {
-        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(hostsCluster, null));
+        Assertions.assertThrows(NullPointerException.class, () -> new 
KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe()));
     }
 
     @Test
     public void testNotTableExist() throws IOException {
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),false);
-        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo);
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new 
DefaultSerDe());
         Assertions.assertThrows(UnsupportedOperationException.class, () -> 
sink.open(new Configuration()));
     }
 
@@ -52,7 +53,7 @@ public class KuduSinkTest extends KuduDatabase {
     public void testOutputWithStrongConsistency() throws Exception {
 
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new 
DefaultSerDe())
                 .withStrongConsistency();
         sink.open(new Configuration());
 
@@ -69,7 +70,7 @@ public class KuduSinkTest extends KuduDatabase {
     @Test
     public void testOutputWithEventualConsistency() throws Exception {
         KuduTableInfo tableInfo = 
booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new 
DefaultSerDe())
                 .withEventualConsistency();
         sink.open(new Configuration());
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
index 41e59b7..99efbd1 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
@@ -67,7 +67,7 @@ public class KuduDatabase {
             KuduConnector tableContext = new KuduConnector(hostsCluster, 
tableInfo);
             booksDataRow().forEach(row -> {
                 try {
-                    tableContext.writeRow(row, 
KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+                    tableContext.writeRow(row);
                 }catch (Exception e) {
                     e.printStackTrace();
                 }
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
new file mode 100644
index 0000000..afe57ca
--- /dev/null
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 
2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PojoSerDeTest {
+
+    public class TestPojo {
+        private String field1;
+        private String field2;
+        private String field3;
+
+        public TestPojo() {
+            field1 = "field1";
+            field2 = "field2";
+            field3 = "field3";
+        }
+    }
+
+    @Test
+    public void testFieldsNotInSchema() {
+
+        TestPojo pojo = new TestPojo();
+
+        KuduTableInfo tableInfo = KuduTableInfo.Builder.create("test")
+                .addColumn(KuduColumnInfo.Builder.create("field1", 
Type.STRING).key(true).hashKey(true).build())
+                .addColumn(KuduColumnInfo.Builder.create("field2", 
Type.STRING).build())
+                .build();
+
+        KuduRow row = new 
PojoSerDe<>(TestPojo.class).withSchema(tableInfo.getSchema()).serialize(pojo);
+
+        Assertions.assertEquals(2, row.blindMap().size());
+        Assertions.assertEquals("field1", row.getField("field1"));
+        Assertions.assertEquals("field2", row.getField("field2"));
+
+    }
+}

Reply via email to