This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f449006  kinesis-sink: manage msg ordering for publish callback 
failure (#2285)
f449006 is described below

commit f4490067739ad82023df4bc7721c16dbb7245538
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Aug 2 12:35:48 2018 -0700

    kinesis-sink: manage msg ordering for publish callback failure (#2285)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 49 ++++++++++++++--------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       |  1 +
 2 files changed, 33 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dc70b98..67de21a 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -44,6 +44,7 @@ import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -89,6 +90,12 @@ public class KinesisSink implements Sink<byte[]> {
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    // 
+    private static final int FALSE = 0;
+    private static final int TRUE = 1;
+    private volatile int previousPublishFailed = FALSE;
+    private static final AtomicIntegerFieldUpdater<KinesisSink> 
IS_PUBLISH_FAILED =
+            AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, 
"previousPublishFailed");
 
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
@@ -101,6 +108,12 @@ public class KinesisSink implements Sink<byte[]> {
     
     @Override
     public void write(Record<byte[]> record) throws Exception {
+        // kpl-thread captures publish-failure. fail the publish on main 
pulsar-io-thread to maintain the ordering 
+        if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == 
TRUE) {
+            LOG.warn("Skip acking message to retain ordering with previous 
failed message {}-{}", this.streamName,
+                    record.getRecordSequence());
+            throw new IllegalStateException("kinesis queue has publish 
failure");
+        }
         String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
@@ -109,7 +122,7 @@ public class KinesisSink implements Sink<byte[]> {
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey, data);
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, record, 
System.nanoTime(), sinkContext), directExecutor());
+                ProducerSendCallback.create(this, record, System.nanoTime()), 
directExecutor());
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, 
data.array().length);
@@ -151,6 +164,7 @@ public class KinesisSink implements Sink<byte[]> {
 
         this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
         this.kinesisProducer = new KinesisProducer(kinesisConfig);
+        IS_PUBLISH_FAILED.set(this, FALSE);
 
         LOG.info("Kinesis sink started. {}", 
(ReflectionToStringBuilder.toString(kinesisConfig, 
ToStringStyle.SHORT_PREFIX_STYLE)));
     }
@@ -167,30 +181,26 @@ public class KinesisSink implements Sink<byte[]> {
     private static final class ProducerSendCallback implements 
FutureCallback<UserRecordResult> {
 
         private Record<byte[]> resultContext;
-        private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
-        private SinkContext sinkContext;
+        private KinesisSink kinesisSink;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> 
recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, Record<byte[]> 
resultContext, long startTime,
-                SinkContext sinkContext) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, 
Record<byte[]> resultContext, long startTime) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
-            sendCallback.streamName = streamName;
+            sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
-            sendCallback.sinkContext = sinkContext;
             return sendCallback;
         }
 
         private void recycle() {
             resultContext = null;
-            streamName = null;
+            kinesisSink = null;
             startTime = 0;
-            sinkContext = null;
             recyclerHandle.recycle(this);
         }
 
@@ -204,23 +214,28 @@ public class KinesisSink implements Sink<byte[]> {
         @Override
         public void onSuccess(UserRecordResult result) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Successfully published message for {}-{} with 
latency", this.streamName, result.getShardId(),
+                LOG.debug("Successfully published message for {}-{} with 
latency {}", kinesisSink.streamName, result.getShardId(),
                         TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - 
startTime)));
             }
-            this.resultContext.ack();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            }
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && 
kinesisSink.previousPublishFailed == TRUE) {
+                LOG.warn("Skip acking message to retain ordering with previous 
failed message {}-{} on shard {}",
+                        kinesisSink.streamName, 
resultContext.getRecordSequence(), result.getShardId());
+            } else {
+                this.resultContext.ack();
             }
             recycle();
         }
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of 
{}-{} ", streamName,
+            LOG.error("[{}] Failed to published message for replicator of 
{}-{} ", kinesisSink.streamName,
                     resultContext.getPartitionId(), 
resultContext.getRecordSequence());
-            this.resultContext.fail();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+            kinesisSink.previousPublishFailed = TRUE;
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
             recycle();
         }
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index bf5f2ea..ba476ab 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -45,6 +45,7 @@ public class KinesisSinkConfig implements Serializable {
     private String awsCredentialPluginName;
     private String awsCredentialPluginParam;
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // 
default : ONLY_RAW_PAYLOAD
+    private boolean retainOrdering;
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

Reply via email to