This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f449006 kinesis-sink: manage msg ordering for publish callback failure (#2285) f449006 is described below commit f4490067739ad82023df4bc7721c16dbb7245538 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Aug 2 12:35:48 2018 -0700 kinesis-sink: manage msg ordering for publish callback failure (#2285) --- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 49 ++++++++++++++-------- .../pulsar/io/kinesis/KinesisSinkConfig.java | 1 + 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index dc70b98..67de21a 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -44,6 +44,7 @@ import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -89,6 +90,12 @@ public class KinesisSink implements Sink<byte[]> { private static final String defaultPartitionedKey = "default"; private static final int maxPartitionedKeyLength = 256; private SinkContext sinkContext; + // + private static final int FALSE = 0; + private static final int TRUE = 1; + private volatile int previousPublishFailed = FALSE; + private static final AtomicIntegerFieldUpdater<KinesisSink> IS_PUBLISH_FAILED = + AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed"); public static final String ACCESS_KEY_NAME = "accessKey"; public static final String SECRET_KEY_NAME = "secretKey"; @@ -101,6 +108,12 @@ public class KinesisSink implements Sink<byte[]> { @Override public void write(Record<byte[]> record) throws Exception { + // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering + if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) { + LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName, + record.getRecordSequence()); + throw new IllegalStateException("kinesis queue has publish failure"); + } String partitionedKey = record.getKey().orElse(defaultPartitionedKey); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) @@ -109,7 +122,7 @@ public class KinesisSink implements Sink<byte[]> { ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, partitionedKey, data); addCallback(addRecordResult, - ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext), directExecutor()); + ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor()); if (sinkContext != null) { sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1); sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length); @@ -151,6 +164,7 @@ public class KinesisSink implements Sink<byte[]> { this.streamName = kinesisSinkConfig.getAwsKinesisStreamName(); this.kinesisProducer = new KinesisProducer(kinesisConfig); + IS_PUBLISH_FAILED.set(this, FALSE); LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig, ToStringStyle.SHORT_PREFIX_STYLE))); } @@ -167,30 +181,26 @@ public class KinesisSink implements Sink<byte[]> { private static final class ProducerSendCallback implements FutureCallback<UserRecordResult> { private Record<byte[]> resultContext; - private String streamName; private long startTime = 0; private final Handle<ProducerSendCallback> recyclerHandle; - private SinkContext sinkContext; + private KinesisSink kinesisSink; private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) { this.recyclerHandle = recyclerHandle; } - static ProducerSendCallback create(String streamName, Record<byte[]> resultContext, long startTime, - SinkContext sinkContext) { + static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime) { ProducerSendCallback sendCallback = RECYCLER.get(); sendCallback.resultContext = resultContext; - sendCallback.streamName = streamName; + sendCallback.kinesisSink = kinesisSink; sendCallback.startTime = startTime; - sendCallback.sinkContext = sinkContext; return sendCallback; } private void recycle() { resultContext = null; - streamName = null; + kinesisSink = null; startTime = 0; - sinkContext = null; recyclerHandle.recycle(this); } @@ -204,23 +214,28 @@ public class KinesisSink implements Sink<byte[]> { @Override public void onSuccess(UserRecordResult result) { if (LOG.isDebugEnabled()) { - LOG.debug("Successfully published message for {}-{} with latency", this.streamName, result.getShardId(), + LOG.debug("Successfully published message for {}-{} with latency {}", kinesisSink.streamName, result.getShardId(), TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime))); } - this.resultContext.ack(); - if (sinkContext != null) { - sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1); + if (kinesisSink.sinkContext != null) { + kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1); + } + if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed == TRUE) { + LOG.warn("Skip acking message to retain ordering with previous failed message {}-{} on shard {}", + kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId()); + } else { + this.resultContext.ack(); } recycle(); } @Override public void onFailure(Throwable exception) { - LOG.error("[{}] Failed to published message for replicator of {}-{} ", streamName, + LOG.error("[{}] Failed to published message for replicator of {}-{} ", kinesisSink.streamName, resultContext.getPartitionId(), resultContext.getRecordSequence()); - this.resultContext.fail(); - if (sinkContext != null) { - sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1); + kinesisSink.previousPublishFailed = TRUE; + if (kinesisSink.sinkContext != null) { + kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1); } recycle(); } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index bf5f2ea..ba476ab 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -45,6 +45,7 @@ public class KinesisSinkConfig implements Serializable { private String awsCredentialPluginName; private String awsCredentialPluginParam; private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD + private boolean retainOrdering; public static KinesisSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory());