This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit f9c75bbc99f190fd562215ee1dfef53a36241cda Author: SuXingLee <913910...@qq.com> AuthorDate: Fri Mar 22 20:08:33 2019 +0800 [BAHIR-202] Improve KuduSink throughput using async FlushMode By default, KuduSink processing message one by one without checkpoint. When checkoint is enabled, throughput is improved by using FlushMode.AUTO_FLUSH_BACKGROUND, and use checkpoint to ensure at-least-once. Closes #50 --- .../connectors/kudu/KuduOutputFormat.java | 5 +- .../flink/streaming/connectors/kudu/KuduSink.java | 59 ++++++++++++++++++++-- .../connectors/kudu/connector/KuduConnector.java | 18 +++++-- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java index 9d12710..c1301da 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import java.io.IOException; public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); private String kuduMasters; @@ -87,7 +90,7 @@ public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> { @Override public void open(int taskNumber, int numTasks) throws IOException { if (connector != null) return; - connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); + connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC); serializer = serializer.withSchema(tableInfo.getSchema()); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java index 53cf249..b6dd9c8 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java @@ -17,18 +17,25 @@ package org.apache.flink.streaming.connectors.kudu; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization; import org.apache.flink.util.Preconditions; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -public class KuduSink<OUT> extends RichSinkFunction<OUT> { +public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); @@ -36,6 +43,7 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> { private KuduTableInfo tableInfo; private KuduConnector.Consistency consistency; private KuduConnector.WriteMode writeMode; + private FlushMode flushMode; private KuduSerialization<OUT> serializer; @@ -77,11 +85,42 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> { return this; } + public KuduSink<OUT> withSyncFlushMode() { + this.flushMode = FlushMode.AUTO_FLUSH_SYNC; + return this; + } + + public KuduSink<OUT> withAsyncFlushMode() { + this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; + return this; + } + @Override public void open(Configuration parameters) throws IOException { - if (connector != null) return; - connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode); - serializer.withSchema(tableInfo.getSchema()); + if (this.connector != null) return; + this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode()); + this.serializer.withSchema(tableInfo.getSchema()); + } + + /** + * If flink checkpoint is disable,synchronously write data to kudu. + * <p>If flink checkpoint is enable, asynchronously write data to kudu by default. + * + * <p>(Note: async may result in out-of-order writes to Kudu. + * you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. ) + * + * @return flushMode + */ + private FlushMode getflushMode() { + FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC; + boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); + if(enableCheckpoint && this.flushMode == null) { + flushMode = FlushMode.AUTO_FLUSH_BACKGROUND; + } + if(enableCheckpoint && this.flushMode != null) { + flushMode = this.flushMode; + } + return flushMode; } @Override @@ -103,4 +142,16 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> { throw new IOException(e.getLocalizedMessage(), e); } } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state {} ...", context.getCheckpointId()); + } + this.connector.flush(); + } } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java index a3851c4..d45886c 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java @@ -18,9 +18,11 @@ package org.apache.flink.streaming.connectors.kudu.connector; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; + import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.common.time.Time; import org.apache.kudu.client.*; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +42,7 @@ public class KuduConnector implements AutoCloseable { private AsyncKuduClient client; private KuduTable table; + private AsyncKuduSession session; private Consistency consistency; private WriteMode writeMode; @@ -48,15 +51,17 @@ public class KuduConnector implements AutoCloseable { private static AtomicBoolean errorTransactions = new AtomicBoolean(false); public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException { - this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT); + this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC); } - public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode) throws IOException { + public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException { this.client = client(kuduMasters); this.table = table(tableInfo); + this.session = client.newSession(); this.consistency = consistency; this.writeMode = writeMode; this.defaultCB = new ResponseCallback(); + this.session.setFlushMode(flushMode); } private AsyncKuduClient client(String kuduMasters) { @@ -109,7 +114,6 @@ public class KuduConnector implements AutoCloseable { public boolean writeRow(KuduRow row) throws Exception { final Operation operation = KuduMapper.toOperation(table, writeMode, row); - AsyncKuduSession session = client.newSession(); Deferred<OperationResponse> response = session.apply(operation); if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) { @@ -119,7 +123,6 @@ public class KuduConnector implements AutoCloseable { processResponse(response.join()); } - session.close(); return !errorTransactions.get(); } @@ -131,10 +134,17 @@ public class KuduConnector implements AutoCloseable { Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds()); } + if (session == null) return; + session.close(); + if (client == null) return; client.close(); } + public void flush(){ + this.session.flush(); + } + private class ResponseCallback implements Callback<Boolean, OperationResponse> { @Override public Boolean call(OperationResponse operationResponse) {