This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 34750d5 STORM-3421: Fix checkstyle violations in storm-kinesis new f617c6e Merge pull request #3036 from krichter722/checkstyle-kinesis 34750d5 is described below commit 34750d557b82cbdfdef379ab9b7162142a06b169 Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Mon Jun 24 01:15:03 2019 +0200 STORM-3421: Fix checkstyle violations in storm-kinesis --- external/storm-kinesis/pom.xml | 2 +- .../kinesis/spout/CredentialsProviderChain.java | 5 +- .../kinesis/spout/ExponentialBackoffRetrier.java | 33 +++-- .../kinesis/spout/FailedMessageRetryHandler.java | 22 +-- .../apache/storm/kinesis/spout/KinesisConfig.java | 59 ++++---- .../storm/kinesis/spout/KinesisConnection.java | 43 ++++-- .../storm/kinesis/spout/KinesisConnectionInfo.java | 26 ++-- .../storm/kinesis/spout/KinesisMessageId.java | 40 +++--- .../storm/kinesis/spout/KinesisRecordsManager.java | 157 +++++++++++++-------- .../apache/storm/kinesis/spout/KinesisSpout.java | 8 +- .../storm/kinesis/spout/RecordToTupleMapper.java | 12 +- .../spout/{ZKConnection.java => ZkConnection.java} | 26 ++-- .../org/apache/storm/kinesis/spout/ZkInfo.java | 34 +++-- 13 files changed, 279 insertions(+), 188 deletions(-) diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml index 6f04950..39c89f1 100644 --- a/external/storm-kinesis/pom.xml +++ b/external/storm-kinesis/pom.xml @@ -62,7 +62,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>185</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java index 4287ae0..5820851 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java @@ -26,10 +26,11 @@ import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; /** - * Class representing chain of mechanisms that will be used in order to connect to kinesis + * Class representing chain of mechanisms that will be used in order to connect to kinesis. */ public class CredentialsProviderChain extends AWSCredentialsProviderChain { - public CredentialsProviderChain () { + + public CredentialsProviderChain() { super(new EnvironmentVariableCredentialsProvider(), new SystemPropertiesCredentialsProvider(), new ClasspathPropertiesFileCredentialsProvider(), diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java index 88e8d70..2920913 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java @@ -18,9 +18,6 @@ package org.apache.storm.kinesis.spout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Serializable; import java.util.Comparator; import java.util.HashMap; @@ -28,6 +25,9 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); // Wait interfal for retrying after first failure @@ -44,29 +44,29 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator()); /** - * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for - * retry i where i = 2,3, + * No args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an + * exponential backoff of {@code Math.pow(2,i-1)} secs for retry {@code i} where {@code i = 2,3,...}. */ - public ExponentialBackoffRetrier () { + public ExponentialBackoffRetrier() { this(100L, 2L, Long.MAX_VALUE); } /** - * + * Creates a new exponential backoff retrier. * @param initialDelayMillis delay in milliseconds for first retry * @param baseSeconds base for exponent function in seconds * @param maxRetries maximum number of retries before the record is discarded/acked */ - public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) { + public ExponentialBackoffRetrier(Long initialDelayMillis, Long baseSeconds, Long maxRetries) { this.initialDelayMillis = initialDelayMillis; this.baseSeconds = baseSeconds; this.maxRetries = maxRetries; validate(); } - private void validate () { + private void validate() { if (initialDelayMillis < 0) { - throw new IllegalArgumentException("initialDelayMillis cannot be negative." ); + throw new IllegalArgumentException("initialDelayMillis cannot be negative."); } if (baseSeconds < 0) { throw new IllegalArgumentException("baseSeconds cannot be negative."); @@ -75,6 +75,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser throw new IllegalArgumentException("maxRetries cannot be negative."); } } + @Override public boolean failed(KinesisMessageId messageId) { LOG.debug("Handling failed message {}", messageId); @@ -114,7 +115,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser public KinesisMessageId getNextFailedMessageToRetry() { KinesisMessageId result = null; // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time - if (!retryMessageSet.isEmpty() ) { + if (!retryMessageSet.isEmpty()) { result = retryMessageSet.first(); if (!(retryTimes.get(result) <= System.nanoTime())) { result = null; @@ -126,15 +127,19 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser @Override public void failedMessageEmitted(KinesisMessageId messageId) { - // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail + // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and + // wait for its ack or fail // but still keep it in counts map to retry again on failure or remove on ack LOG.debug("Spout says {} emitted. Hence removing it from queue and wait for its ack or fail", messageId); retryMessageSet.remove(messageId); retryTimes.remove(messageId); } - // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE) - private Long getRetryTime (Long retryNum) { + /** + * private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to + * Long.MAX_VALUE). + */ + private Long getRetryTime(Long retryNum) { Long retryTime = System.nanoTime(); Long nanoMultiplierForMillis = 1000000L; // if first retry then retry time = current time + initial delay diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java index bb0e450..c005c59 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java @@ -22,27 +22,27 @@ import java.io.Serializable; public interface FailedMessageRetryHandler extends Serializable { /** - * message with messageId failed in the spout - * @param messageId + * message with messageId failed in the spout. + * @param messageId the message id * @return true if this failed message was scheduled to be retried, false otherwise */ - boolean failed (KinesisMessageId messageId); + boolean failed(KinesisMessageId messageId); /** - * message with messageId succeeded/acked in the spout - * @param messageId + * message with messageId succeeded/acked in the spout. + * @param messageId the message id */ - void acked (KinesisMessageId messageId); + void acked(KinesisMessageId messageId); /** - * Get the next failed message's id to retry if any, null otherwise + * Get the next failed message's id to retry if any, null otherwise. * @return messageId */ - KinesisMessageId getNextFailedMessageToRetry (); + KinesisMessageId getNextFailedMessageToRetry(); /** - * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout - * @param messageId + * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout. + * @param messageId the message id */ - void failedMessageEmitted (KinesisMessageId messageId); + void failedMessageEmitted(KinesisMessageId messageId); } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java index 744aef5..eeb5896 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java @@ -38,13 +38,22 @@ public class KinesisConfig implements Serializable { private final ZkInfo zkInfo; // object representing information on paramaters to use while connecting to kinesis using kinesis client private final KinesisConnectionInfo kinesisConnectionInfo; - // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further. - // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result - // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked + /** + * This number represents the number of messages that are still not committed to zk. it will prevent the spout from + * emitting further. for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still can't be + * committed to zk because 1 is still in failed set. As a result the acked queue can infinitely grow without any of + * them being committed to zk. topology max pending does not help since from storm's view they are acked + */ private final Long maxUncommittedRecords; - public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler - failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) { + public KinesisConfig(String streamName, + ShardIteratorType shardIteratorType, + RecordToTupleMapper recordToTupleMapper, + Date timestamp, + FailedMessageRetryHandler failedMessageRetryHandler, + ZkInfo zkInfo, + KinesisConnectionInfo kinesisConnectionInfo, + Long maxUncommittedRecords) { this.streamName = streamName; this.shardIteratorType = shardIteratorType; this.recordToTupleMapper = recordToTupleMapper; @@ -56,14 +65,16 @@ public class KinesisConfig implements Serializable { validate(); } - private void validate () { + private void validate() { if (streamName == null || streamName.length() < 1) { throw new IllegalArgumentException("streamName is required and cannot be of length 0."); } - if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType - .AT_SEQUENCE_NUMBER)) { - throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST + - "," + ShardIteratorType.TRIM_HORIZON); + if (shardIteratorType == null + || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) + || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) { + throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + + "," + ShardIteratorType.LATEST + + "," + ShardIteratorType.TRIM_HORIZON); } if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) { throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP); @@ -101,33 +112,33 @@ public class KinesisConfig implements Serializable { return timestamp; } - public FailedMessageRetryHandler getFailedMessageRetryHandler () { + public FailedMessageRetryHandler getFailedMessageRetryHandler() { return failedMessageRetryHandler; } - public ZkInfo getZkInfo () { + public ZkInfo getZkInfo() { return zkInfo; } - public KinesisConnectionInfo getKinesisConnectionInfo () { + public KinesisConnectionInfo getKinesisConnectionInfo() { return kinesisConnectionInfo; } - public Long getMaxUncommittedRecords () { + public Long getMaxUncommittedRecords() { return maxUncommittedRecords; } @Override public String toString() { - return "KinesisConfig{" + - "streamName='" + streamName + '\'' + - ", shardIteratorType=" + shardIteratorType + - ", recordToTupleMapper=" + recordToTupleMapper + - ", timestamp=" + timestamp + - ", zkInfo=" + zkInfo + - ", kinesisConnectionInfo=" + kinesisConnectionInfo + - ", failedMessageRetryHandler =" + failedMessageRetryHandler + - ", maxUncommittedRecords=" + maxUncommittedRecords + - '}'; + return "KinesisConfig{" + + "streamName='" + streamName + '\'' + + ", shardIteratorType=" + shardIteratorType + + ", recordToTupleMapper=" + recordToTupleMapper + + ", timestamp=" + timestamp + + ", zkInfo=" + zkInfo + + ", kinesisConnectionInfo=" + kinesisConnectionInfo + + ", failedMessageRetryHandler =" + failedMessageRetryHandler + + ", maxUncommittedRecords=" + maxUncommittedRecords + + '}'; } } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java index dfd9049..b8b969a 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java @@ -28,28 +28,30 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + class KinesisConnection { private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); private final KinesisConnectionInfo kinesisConnectionInfo; private AmazonKinesisClient kinesisClient; - KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) { + KinesisConnection(KinesisConnectionInfo kinesisConnectionInfo) { this.kinesisConnectionInfo = kinesisConnectionInfo; } - void initialize () { - kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration()); + void initialize() { + kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), + kinesisConnectionInfo.getClientConfiguration()); kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion())); } - List<Shard> getShardsForStream (String stream) { + List<Shard> getShardsForStream(String stream) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(stream); List<Shard> shards = new ArrayList<>(); @@ -63,19 +65,24 @@ class KinesisConnection { } else { exclusiveStartShardId = null; } - } while ( exclusiveStartShardId != null ); + } while (exclusiveStartShardId != null); LOG.info("Number of shards for stream " + stream + " are " + shards.size()); return shards; } - String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) { + String getShardIterator(String stream, + String shardId, + ShardIteratorType shardIteratorType, + String sequenceNumber, + Date timestamp) { String shardIterator = ""; try { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(stream); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(shardIteratorType); - if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) { + if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) + || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) { getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) { getShardIteratorRequest.setTimestamp(timestamp); @@ -85,15 +92,21 @@ class KinesisConnection { shardIterator = getShardIteratorResult.getShardIterator(); } } catch (Exception e) { - LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " + - sequenceNumber + " timestamp " + timestamp, e); + LOG.warn("Exception occured while getting shardIterator for shard " + shardId + + " shardIteratorType " + shardIteratorType + + " sequence number " + sequenceNumber + + " timestamp " + timestamp, + e); } - LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " + - sequenceNumber + " timestamp" + timestamp); + LOG.warn("Returning shardIterator " + shardIterator + + " for shardId " + shardId + + " shardIteratorType " + shardIteratorType + + " sequenceNumber " + sequenceNumber + + " timestamp" + timestamp); return shardIterator; } - GetRecordsResult fetchRecords (String shardIterator) { + GetRecordsResult fetchRecords(String shardIterator) { GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit()); @@ -101,7 +114,7 @@ class KinesisConnection { return getRecordsResult; } - void shutdown () { + void shutdown() { kinesisClient.shutdown(); } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java index 67ca29f..121e0bf 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java @@ -24,13 +24,14 @@ import com.amazonaws.regions.Regions; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; import java.util.Arrays; +import org.objenesis.strategy.StdInstantiatorStrategy; + public class KinesisConnectionInfo implements Serializable { private final byte[] serializedKinesisCredsProvider; private final byte[] serializedkinesisClientConfig; @@ -41,13 +42,16 @@ public class KinesisConnectionInfo implements Serializable { private transient ClientConfiguration clientConfiguration; /** - * + * Create a new Kinesis connection info. * @param credentialsProvider implementation to provide credentials to connect to kinesis * @param clientConfiguration client configuration to pass to kinesis client * @param region region to connect to * @param recordsLimit max records to be fetched in a getRecords request to kinesis */ - public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) { + public KinesisConnectionInfo(AWSCredentialsProvider credentialsProvider, + ClientConfiguration clientConfiguration, + Regions region, + Integer recordsLimit) { if (recordsLimit == null || recordsLimit <= 0) { throw new IllegalArgumentException("recordsLimit has to be a positive integer"); } @@ -82,7 +86,7 @@ public class KinesisConnectionInfo implements Serializable { return region; } - private byte[] getKryoSerializedBytes (final Object obj) { + private byte[] getKryoSerializedBytes(final Object obj) { final Kryo kryo = new Kryo(); final ByteArrayOutputStream os = new ByteArrayOutputStream(); final Output output = new Output(os); @@ -92,7 +96,7 @@ public class KinesisConnectionInfo implements Serializable { return os.toByteArray(); } - private Object getKryoDeserializedObject (final byte[] ser) { + private Object getKryoDeserializedObject(final byte[] ser) { final Kryo kryo = new Kryo(); final Input input = new Input(new ByteArrayInputStream(ser)); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); @@ -101,11 +105,11 @@ public class KinesisConnectionInfo implements Serializable { @Override public String toString() { - return "KinesisConnectionInfo{" + - "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) + - ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) + - ", region=" + region + - ", recordsLimit=" + recordsLimit + - '}'; + return "KinesisConnectionInfo{" + + "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) + + ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) + + ", region=" + region + + ", recordsLimit=" + recordsLimit + + '}'; } } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java index dd239f1..7fd179f 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java @@ -23,48 +23,56 @@ public class KinesisMessageId { private final String shardId; private final String sequenceNumber; - KinesisMessageId (String streamName, String shardId, String sequenceNumber) { + KinesisMessageId(String streamName, String shardId, String sequenceNumber) { this.streamName = streamName; this.shardId = shardId; this.sequenceNumber = sequenceNumber; } - public String getStreamName () { + public String getStreamName() { return streamName; } - public String getShardId () { + public String getShardId() { return shardId; } - public String getSequenceNumber () { + public String getSequenceNumber() { return sequenceNumber; } @Override - public String toString () { - return "KinesisMessageId{" + - "streamName='" + streamName + '\'' + - ", shardId='" + shardId + '\'' + - ", sequenceNumber='" + sequenceNumber + '\'' + - '}'; + public String toString() { + return "KinesisMessageId{" + + "streamName='" + streamName + '\'' + + ", shardId='" + shardId + '\'' + + ", sequenceNumber='" + sequenceNumber + '\'' + + '}'; } @Override - public boolean equals (Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } KinesisMessageId that = (KinesisMessageId) o; - if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false; - if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false; + if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) { + return false; + } + if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) { + return false; + } return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null); } @Override - public int hashCode () { + public int hashCode() { int result = streamName != null ? streamName.hashCode() : 0; result = 31 * result + (shardId != null ? shardId.hashCode() : 0); result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0); diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java index 7831193..33dba4a 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java @@ -24,9 +24,6 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.apache.storm.spout.SpoutOutputCollector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.HashMap; @@ -36,20 +33,27 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; +import org.apache.storm.spout.SpoutOutputCollector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + class KinesisRecordsManager { private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); // object handling zk interaction - private transient ZKConnection zkConnection; + private transient ZkConnection zkConnection; // object handling interaction with kinesis private transient KinesisConnection kinesisConnection; // Kinesis Spout KinesisConfig object - private transient final KinesisConfig kinesisConfig; + private final transient KinesisConfig kinesisConfig; // Queue of records per shard fetched from kinesis and are waiting to be emitted private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>(); // Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); - // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the - // sequence number to commit. Logic explained in commit + /** + * Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. + * At the same time order is needed to figure out the sequence number to commit. Logic explained in commit + */ private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>(); // sorted acked sequence numbers - needed to figure out what sequence number can be committed private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>(); @@ -66,13 +70,13 @@ class KinesisRecordsManager { // boolean to track deactivated state private transient boolean deactivated; - KinesisRecordsManager (KinesisConfig kinesisConfig) { + KinesisRecordsManager(KinesisConfig kinesisConfig) { this.kinesisConfig = kinesisConfig; - this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); + this.zkConnection = new ZkConnection(kinesisConfig.getZkInfo()); this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); } - void initialize (int myTaskIndex, int totalTasks) { + void initialize(int myTaskIndex, int totalTasks) { deactivated = false; lastCommitTime = System.currentTimeMillis(); kinesisConnection.initialize(); @@ -90,7 +94,7 @@ class KinesisRecordsManager { refreshShardIteratorsForNewRecords(); } - void next (SpoutOutputCollector collector) { + void next(SpoutOutputCollector collector) { if (shouldCommit()) { commit(); } @@ -98,7 +102,8 @@ class KinesisRecordsManager { if (failedMessageId != null) { // if the retry service returns a message that is not in failed set then ignore it. should never happen BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber()); - if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) { + if (failedPerShard.containsKey(failedMessageId.getShardId()) + && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) { if (!failedandFetchedRecords.containsKey(failedMessageId)) { fetchFailedRecords(failedMessageId); } @@ -107,8 +112,8 @@ class KinesisRecordsManager { kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId); return; } else { - LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " + - "infinitely"); + LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + + ". This can happen a few times but should not happen infinitely"); } } else { LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen."); @@ -132,29 +137,34 @@ class KinesisRecordsManager { emitNewRecord(collector); } - void ack (KinesisMessageId kinesisMessageId) { + void ack(KinesisMessageId kinesisMessageId) { // for an acked message add it to acked set and remove it from emitted and failed String shardId = kinesisMessageId.getShardId(); BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber()); LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber); - // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the + // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while + // committing we need to figure out what is the // highest sequence number that can be committed for this shard if (!ackedPerShard.containsKey(shardId)) { ackedPerShard.put(shardId, new TreeSet<BigInteger>()); } ackedPerShard.get(shardId).add(sequenceNumber); - // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples) + // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard (which + // keeps track of in flight tuples) if (emittedPerShard.containsKey(shardId)) { TreeSet<BigInteger> emitted = emittedPerShard.get(shardId); emitted.remove(sequenceNumber); } - // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding. + // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard + // from failedPerShard. Defensive coding. // Remove it from failedPerShard anyway if (failedPerShard.containsKey(shardId)) { failedPerShard.get(shardId).remove(sequenceNumber); } - // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to - // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory + // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in + // failedAndFetchedRecords. We use that to + // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to + // clean up memory if (failedandFetchedRecords.containsKey(kinesisMessageId)) { kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId); failedandFetchedRecords.remove(kinesisMessageId); @@ -165,7 +175,7 @@ class KinesisRecordsManager { } } - void fail (KinesisMessageId kinesisMessageId) { + void fail(KinesisMessageId kinesisMessageId) { String shardId = kinesisMessageId.getShardId(); BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber()); LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber); @@ -186,13 +196,18 @@ class KinesisRecordsManager { } } - void commit () { - // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper. - // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to - // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from + void commit() { + // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number + // can be committed to zookeeper. + // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack + // it moves from emittedPerShard to + // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to + // failedPerShard. The failed records will move from // failedPerShard to emittedPerShard when the failed record is emitted again as a retry. - // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence - // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and + // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard + // called X such that there is no sequence + // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, + // emittedPerShard is 2,6 and // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed for (String shardId: toEmitPerShard.keySet()) { if (ackedPerShard.containsKey(shardId)) { @@ -202,7 +217,8 @@ class KinesisRecordsManager { } if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) { BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first(); - if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) { + if (commitSequenceNumberBound == null + || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) { commitSequenceNumberBound = smallestEmittedSequenceNumber; } } @@ -210,7 +226,8 @@ class KinesisRecordsManager { BigInteger ackedSequenceNumberToCommit = null; while (ackedSequenceNumbers.hasNext()) { BigInteger ackedSequenceNumber = ackedSequenceNumbers.next(); - if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) { + if (commitSequenceNumberBound == null + || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) { ackedSequenceNumberToCommit = ackedSequenceNumber; ackedSequenceNumbers.remove(); } else { @@ -220,7 +237,9 @@ class KinesisRecordsManager { if (ackedSequenceNumberToCommit != null) { Map<Object, Object> state = new HashMap<>(); state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString()); - LOG.debug("Committing sequence number {} for shardId {}", ackedSequenceNumberToCommit.toString(), shardId); + LOG.debug("Committing sequence number {} for shardId {}", + ackedSequenceNumberToCommit.toString(), + shardId); zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state); } } @@ -228,34 +247,38 @@ class KinesisRecordsManager { lastCommitTime = System.currentTimeMillis(); } - void activate () { + void activate() { LOG.info("Activate called"); deactivated = false; kinesisConnection.initialize(); } - void deactivate () { + void deactivate() { LOG.info("Deactivate called"); deactivated = true; commit(); kinesisConnection.shutdown(); } - void close () { + void close() { commit(); kinesisConnection.shutdown(); zkConnection.shutdown(); } - // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also + // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched + // and are in the failed queue will also // be kept in memory to avoid going to kinesis again for retry - private void fetchFailedRecords (KinesisMessageId kinesisMessageId) { + private void fetchFailedRecords(KinesisMessageId kinesisMessageId) { // if shard iterator not present for this message, get it if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) { refreshShardIteratorForFailedRecord(kinesisMessageId); } String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId); - LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", kinesisMessageId.getShardId(), kinesisMessageId.getSequenceNumber(), shardIterator); + LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", + kinesisMessageId.getShardId(), + kinesisMessageId.getSequenceNumber(), + shardIterator); try { GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator); if (getRecordsResult != null) { @@ -269,7 +292,9 @@ class KinesisRecordsManager { } else { // add all fetched records to the set of failed records if they are present in failed set for (Record record: records) { - KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber()); + KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), + kinesisMessageId.getShardId(), + record.getSequenceNumber()); if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) { failedandFetchedRecords.put(current, record); shardIteratorPerFailedMessage.remove(current); @@ -292,12 +317,15 @@ class KinesisRecordsManager { } } - private void fetchNewRecords () { + private void fetchNewRecords() { for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) { String shardId = entry.getKey(); try { String shardIterator = shardIteratorPerShard.get(shardId); - LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", shardId, shardIterator, fetchedSequenceNumberPerShard.get(shardId)); + LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", + shardId, + shardIterator, + fetchedSequenceNumberPerShard.get(shardId)); GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator); if (getRecordsResult != null) { List<Record> records = getRecordsResult.getRecords(); @@ -328,28 +356,30 @@ class KinesisRecordsManager { } } - private void emitNewRecord (SpoutOutputCollector collector) { + private void emitNewRecord(SpoutOutputCollector collector) { for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) { String shardId = entry.getKey(); LinkedList<Record> listOfRecords = entry.getValue(); Record record; while ((record = listOfRecords.pollFirst()) != null) { - KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber()); + KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), + shardId, + record.getSequenceNumber()); if (emitRecord(collector, record, kinesisMessageId)) { - return; + return; } } } } - private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) { + private boolean emitFailedRecord(SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) { if (!failedandFetchedRecords.containsKey(kinesisMessageId)) { return false; } return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId); } - private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) { + private boolean emitRecord(SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) { boolean result = false; List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record); // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail @@ -368,11 +398,11 @@ class KinesisRecordsManager { return result; } - private boolean shouldCommit () { + private boolean shouldCommit() { return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs()); } - private void initializeFetchedSequenceNumbers () { + private void initializeFetchedSequenceNumbers() { for (String shardId : toEmitPerShard.keySet()) { Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId); // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber @@ -386,38 +416,47 @@ class KinesisRecordsManager { } } - private void refreshShardIteratorsForNewRecords () { + private void refreshShardIteratorsForNewRecords() { for (String shardId: toEmitPerShard.keySet()) { refreshShardIteratorForNewRecords(shardId); } } - private void refreshShardIteratorForNewRecords (String shardId) { + private void refreshShardIteratorForNewRecords(String shardId) { String shardIterator = null; String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId); - ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType - .AFTER_SEQUENCE_NUMBER); + ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null + ? kinesisConfig.getShardIteratorType() + : ShardIteratorType.AFTER_SEQUENCE_NUMBER); // Set the shard iterator for last fetched sequence number to start from correct position in shard - shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig - .getTimestamp()); + shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), + shardId, + shardIteratorType, + lastFetchedSequenceNumber, + kinesisConfig.getTimestamp()); if (shardIterator != null && !shardIterator.isEmpty()) { - LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator); + LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + + " with shardIterator " + shardIterator); shardIteratorPerShard.put(shardId, shardIterator); } } - private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) { + private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) { String shardIterator = null; // Set the shard iterator for last fetched sequence number to start from correct position in shard - shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType - .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null); + shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), + kinesisMessageId.getShardId(), + ShardIteratorType.AT_SEQUENCE_NUMBER, + kinesisMessageId.getSequenceNumber(), + null); if (shardIterator != null && !shardIterator.isEmpty()) { - LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator); + LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + + " with shardIterator " + shardIterator); shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator); } } - private Long getUncommittedRecordsCount () { + private Long getUncommittedRecordsCount() { Long result = 0L; for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) { result += emitted.getValue().size(); @@ -432,7 +471,7 @@ class KinesisRecordsManager { return result; } - private boolean shouldFetchNewRecords () { + private boolean shouldFetchNewRecords() { // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more boolean fetchRecords = true; for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) { diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java index da08a50..cf9295a 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java @@ -18,20 +18,20 @@ package org.apache.storm.kinesis.spout; +import java.util.Map; + import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; -import java.util.Map; - public class KinesisSpout extends BaseRichSpout { private final KinesisConfig kinesisConfig; private transient KinesisRecordsManager kinesisRecordsManager; private transient SpoutOutputCollector collector; - public KinesisSpout (KinesisConfig kinesisConfig) { + public KinesisSpout(KinesisConfig kinesisConfig) { this.kinesisConfig = kinesisConfig; } @@ -41,7 +41,7 @@ public class KinesisSpout extends BaseRichSpout { } @Override - public Map<String, Object> getComponentConfiguration () { + public Map<String, Object> getComponentConfiguration() { return super.getComponentConfiguration(); } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java index c806539..bee9e80 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java @@ -15,24 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kinesis.spout; import com.amazonaws.services.kinesis.model.Record; -import org.apache.storm.tuple.Fields; import java.util.List; +import org.apache.storm.tuple.Fields; + public interface RecordToTupleMapper { /** - * + * Retrieve the names of fields. * @return names of fields in the emitted tuple */ - Fields getOutputFields (); + Fields getOutputFields(); /** - * + * Retrieve the tuple. * @param record kinesis record * @return storm tuple to be emitted for this record, null if no tuple should be emitted */ - List<Object> getTuple (Record record); + List<Object> getTuple(Record record); } diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java similarity index 84% rename from external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java rename to external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java index 46a865b..d0a2dea 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java @@ -18,31 +18,33 @@ package org.apache.storm.kinesis.spout; +import java.nio.charset.Charset; +import java.util.Map; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.json.simple.JSONValue; -import java.nio.charset.Charset; -import java.util.Map; - -class ZKConnection { +class ZkConnection { private final ZkInfo zkInfo; private CuratorFramework curatorFramework; - ZKConnection (ZkInfo zkInfo) { + ZkConnection(ZkInfo zkInfo) { this.zkInfo = zkInfo; } - void initialize () { - curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new - RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs())); + void initialize() { + curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), + zkInfo.getSessionTimeoutMs(), + zkInfo.getConnectionTimeoutMs(), + new RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs())); curatorFramework.start(); } - void commitState (String stream, String shardId, Map<Object, Object> state) { + void commitState(String stream, String shardId, Map<Object, Object> state) { byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8")); try { String path = getZkPath(stream, shardId); @@ -59,7 +61,7 @@ class ZKConnection { } } - Map<Object, Object> readState (String stream, String shardId) { + Map<Object, Object> readState(String stream, String shardId) { try { String path = getZkPath(stream, shardId); Map<Object, Object> state = null; @@ -76,11 +78,11 @@ class ZKConnection { } } - void shutdown () { + void shutdown() { curatorFramework.close(); } - private String getZkPath (String stream, String shardId) { + private String getZkPath(String stream, String shardId) { String path = ""; if (!zkInfo.getZkNode().startsWith("/")) { path += "/"; diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java index a47f0ab..35e2890 100644 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java +++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java @@ -36,8 +36,13 @@ public class ZkInfo implements Serializable { // time to sleep between retries in milliseconds private final Integer retryIntervalMs; - public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer - retryIntervalMs) { + public ZkInfo(String zkUrl, + String zkNode, + Integer sessionTimeoutMs, + Integer connectionTimeoutMs, + Long commitIntervalMs, + Integer retryAttempts, + Integer retryIntervalMs) { this.zkUrl = zkUrl; this.zkNode = zkNode; this.sessionTimeoutMs = sessionTimeoutMs; @@ -76,7 +81,7 @@ public class ZkInfo implements Serializable { return retryIntervalMs; } - private void validate () { + private void validate() { if (zkUrl == null || zkUrl.length() < 1) { throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper"); @@ -91,12 +96,13 @@ public class ZkInfo implements Serializable { checkPositive(retryIntervalMs, "retryIntervalMs"); } - private void checkPositive (Integer argument, String name) { + private void checkPositive(Integer argument, String name) { if (argument == null && argument <= 0) { throw new IllegalArgumentException(name + " must be positive"); } } - private void checkPositive (Long argument, String name) { + + private void checkPositive(Long argument, String name) { if (argument == null && argument <= 0) { throw new IllegalArgumentException(name + " must be positive"); } @@ -104,15 +110,15 @@ public class ZkInfo implements Serializable { @Override public String toString() { - return "ZkInfo{" + - "zkUrl='" + zkUrl + '\'' + - ", zkNode='" + zkNode + '\'' + - ", sessionTimeoutMs=" + sessionTimeoutMs + - ", connectionTimeoutMs=" + connectionTimeoutMs + - ", commitIntervalMs=" + commitIntervalMs + - ", retryAttempts=" + retryAttempts + - ", retryIntervalMs=" + retryIntervalMs + - '}'; + return "ZkInfo{" + + "zkUrl='" + zkUrl + '\'' + + ", zkNode='" + zkNode + '\'' + + ", sessionTimeoutMs=" + sessionTimeoutMs + + ", connectionTimeoutMs=" + connectionTimeoutMs + + ", commitIntervalMs=" + commitIntervalMs + + ", retryAttempts=" + retryAttempts + + ", retryIntervalMs=" + retryIntervalMs + + '}'; } }