This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 01b299b STORM-3457: hdfs: fix all checkstyle warnings new 98ac65f Merge pull request #3075 from krichter722/checkstyle-hdfs 01b299b is described below commit 01b299bbd4c0a749f122befc7ec5d275832fdfe6 Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Thu Jul 4 22:51:35 2019 +0200 STORM-3457: hdfs: fix all checkstyle warnings --- external/storm-hdfs/pom.xml | 2 +- .../storm/hdfs/avro/AbstractAvroSerializer.java | 2 +- .../storm/hdfs/avro/ConfluentAvroSerializer.java | 2 +- .../storm/hdfs/avro/FixedAvroSerializer.java | 2 +- .../apache/storm/hdfs/bolt/AbstractHdfsBolt.java | 22 ++++--- .../apache/storm/hdfs/bolt/SequenceFileBolt.java | 4 +- .../hdfs/bolt/format/DefaultFileNameFormat.java | 9 +-- .../hdfs/bolt/format/DelimitedRecordFormat.java | 11 +--- .../storm/hdfs/bolt/format/FileNameFormat.java | 1 - .../storm/hdfs/bolt/format/RecordFormat.java | 5 +- .../storm/hdfs/bolt/format/SequenceFormat.java | 14 +---- .../hdfs/bolt/format/SimpleFileNameFormat.java | 4 +- .../hdfs/bolt/rotation/FileRotationPolicy.java | 8 +-- .../hdfs/bolt/rotation/FileSizeRotationPolicy.java | 4 +- .../storm/hdfs/bolt/sync/CountSyncPolicy.java | 1 - .../storm/hdfs/common/AbstractHDFSWriter.java | 19 +++--- .../hdfs/common/AvroGenericRecordHDFSWriter.java | 2 +- .../org/apache/storm/hdfs/common/HDFSWriter.java | 1 + .../org/apache/storm/hdfs/common/Partitioner.java | 3 +- .../storm/hdfs/common/rotation/RotationAction.java | 1 - .../java/org/apache/storm/hdfs/spout/Configs.java | 39 ++++++++---- .../java/org/apache/storm/hdfs/spout/DirLock.java | 15 +++-- .../java/org/apache/storm/hdfs/spout/FileLock.java | 69 +++++++++------------- .../org/apache/storm/hdfs/spout/FileOffset.java | 4 +- .../org/apache/storm/hdfs/spout/FileReader.java | 3 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 35 +++++------ .../storm/hdfs/spout/SequenceFileReader.java | 38 ++++++------ .../apache/storm/hdfs/spout/TextFileReader.java | 20 +++---- .../org/apache/storm/hdfs/trident/HdfsState.java | 41 ++++++------- .../hdfs/trident/format/DefaultFileNameFormat.java | 8 +-- .../hdfs/trident/format/DelimitedRecordFormat.java | 11 ---- .../storm/hdfs/trident/format/FileNameFormat.java | 2 - .../storm/hdfs/trident/format/RecordFormat.java | 5 +- .../storm/hdfs/trident/format/SequenceFormat.java | 14 +---- .../hdfs/trident/format/SimpleFileNameFormat.java | 4 +- .../hdfs/trident/rotation/FileRotationPolicy.java | 4 +- .../trident/rotation/FileSizeRotationPolicy.java | 5 +- .../hdfs/trident/rotation/TimedRotationPolicy.java | 2 +- .../storm/hdfs/trident/sync/CountSyncPolicy.java | 1 - .../org/apache/storm/hdfs/spout/TestFileLock.java | 2 +- 40 files changed, 187 insertions(+), 252 deletions(-) diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 60c74a9..d0db240 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -275,7 +275,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>189</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java index adb842a..da7ab74 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java @@ -57,7 +57,7 @@ public abstract class AbstractAvroSerializer extends Serializer<GenericContainer } @Override - public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) { + public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> someClass) { Schema theSchema = this.getSchema(input.readString()); GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema); Decoder decoder = DecoderFactory diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java index 17f3eb7..128d4ff 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java @@ -27,7 +27,7 @@ import org.apache.avro.Schema; */ public class ConfluentAvroSerializer extends AbstractAvroSerializer { - final private String url; + private final String url; private SchemaRegistryClient theClient; /** diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java index 128e802..94607b3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java @@ -30,7 +30,7 @@ import org.apache.commons.codec.binary.Base64; */ public class FixedAvroSerializer extends AbstractAvroSerializer { - private final static String FP_ALGO = "CRC-64-AVRO"; + private static final String FP_ALGO = "CRC-64-AVRO"; final Map<String, Schema> fingerprint2schemaMap = new HashMap<>(); final Map<Schema, String> schema2fingerprintMap = new HashMap<>(); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 156e58a..dfcf30f 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -87,15 +87,16 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { /** * Marked as final to prevent override. Subclasses should implement the doPrepare() method. - * @param conf - * @param topologyContext - * @param collector */ @Override public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) { this.writeLock = new Object(); - if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified."); - if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified."); + if (this.syncPolicy == null) { + throw new IllegalStateException("SyncPolicy must be specified."); + } + if (this.rotationPolicy == null) { + throw new IllegalStateException("RotationPolicy must be specified."); + } if (this.fsUrl == null) { throw new IllegalStateException("File system URL must be specified."); } @@ -208,13 +209,10 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } /** - * A tuple must be mapped to a writer based on two factors: + * A tuple must be mapped to a writer based on two factors. * - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt * for an example of this) * - the directory the tuple will be partioned into - * - * @param tuple - * @return */ private String getHashKeyForTuple(Tuple tuple) { final String boltKey = getWriterKey(tuple); @@ -300,12 +298,12 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { this.fileNameFormat.getName(rotation, System.currentTimeMillis())); } - abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws + protected abstract void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException; - abstract protected String getWriterKey(Tuple tuple); + protected abstract String getWriterKey(Tuple tuple); - abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException; + protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException; static class WritersMap extends LinkedHashMap<String, Writer> { final long maxWriters; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index c73c6a2..991a23c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -114,7 +114,9 @@ public class SequenceFileBolt extends AbstractHdfsBolt { @Override public void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { LOG.info("Preparing Sequence File Bolt..."); - if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified."); + if (this.format == null) { + throw new IllegalStateException("SequenceFormat must be specified."); + } this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); this.codecFactory = new CompressionCodecFactory(hdfsConfig); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java index 42c1f5b..157929c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java @@ -15,7 +15,6 @@ package org.apache.storm.hdfs.bolt.format; import java.util.Map; import org.apache.storm.task.TopologyContext; - /** * Creates file names with the following format: * <pre> @@ -26,7 +25,7 @@ import org.apache.storm.task.TopologyContext; * MyBolt-5-7-1390579837830.txt * </pre> * - * By default, prefix is empty and extenstion is ".txt". + * <p>By default, prefix is empty and extenstion is ".txt". * */ public class DefaultFileNameFormat implements FileNameFormat { @@ -38,9 +37,6 @@ public class DefaultFileNameFormat implements FileNameFormat { /** * Overrides the default prefix. - * - * @param prefix - * @return */ public DefaultFileNameFormat withPrefix(String prefix) { this.prefix = prefix; @@ -49,9 +45,6 @@ public class DefaultFileNameFormat implements FileNameFormat { /** * Overrides the default file extension. - * - * @param extension - * @return */ public DefaultFileNameFormat withExtension(String extension) { this.extension = extension; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java index e9a81cc..f8cdad9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java @@ -20,7 +20,7 @@ import org.apache.storm.tuple.Tuple; * By default uses a comma (",") as the field delimiter and a * newline ("\n") as the record delimiter. * - * Also by default, this implementation will output all the + * <p>Also by default, this implementation will output all the * field values in the tuple in the order they were declared. To * override this behavior, call <code>withFields()</code> to * specify which tuple fields to output. @@ -35,9 +35,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Only output the specified fields. - * - * @param fields - * @return */ public DelimitedRecordFormat withFields(Fields fields) { this.fields = fields; @@ -46,9 +43,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Overrides the default field delimiter. - * - * @param delimiter - * @return */ public DelimitedRecordFormat withFieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; @@ -57,9 +51,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Overrides the default record delimiter. - * - * @param delimiter - * @return */ public DelimitedRecordFormat withRecordDelimiter(String delimiter) { this.recordDelimiter = delimiter; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java index 891c600..70210a9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java @@ -28,7 +28,6 @@ public interface FileNameFormat extends Serializable { * Returns the filename the HdfsBolt will create. * @param rotation the current file rotation number (incremented on every rotation) * @param timeStamp current time in milliseconds when the rotation occurs - * @return */ String getName(long rotation, long timeStamp); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java index 8d55c03..5102f38 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java @@ -12,14 +12,11 @@ package org.apache.storm.hdfs.bolt.format; - import java.io.Serializable; import org.apache.storm.tuple.Tuple; /** - * Formats a Tuple object into a byte array - * that will be written to HDFS. - * + * Formats a Tuple object into a byte array that will be written to HDFS. */ public interface RecordFormat extends Serializable { byte[] format(Tuple tuple); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java index 7c38f66..7ea05a9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java @@ -26,32 +26,22 @@ import org.apache.storm.tuple.Tuple; */ public interface SequenceFormat extends Serializable { /** - * Key class used by implementation (e.g. IntWritable.class, etc.) - * - * @return + * Key class used by implementation (e.g. IntWritable.class, etc.). */ Class keyClass(); /** - * Value class used by implementation (e.g. Text.class, etc.) - * - * @return + * Value class used by implementation (e.g. Text.class, etc.). */ Class valueClass(); /** * Given a tuple, return the key that should be written to the sequence file. - * - * @param tuple - * @return */ Object key(Tuple tuple); /** * Given a tuple, return the value that should be written to the sequence file. - * - * @param tuple - * @return */ Object value(Tuple tuple); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java index 7869d69..d80aaa8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java @@ -73,9 +73,7 @@ public class SimpleFileNameFormat implements FileNameFormat { * $COMPONENT - component id<br/> * $TASK - task id<br/> * - * @param name - * file name - * @return + * @param name file name */ public SimpleFileNameFormat withName(String name) { this.name = name; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java index 6354ae7..13229dd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java @@ -12,18 +12,17 @@ package org.apache.storm.hdfs.bolt.rotation; - import java.io.Serializable; import org.apache.storm.tuple.Tuple; /** * Used by the HdfsBolt to decide when to rotate files. * - * The HdfsBolt will call the <code>mark()</code> method for every + * <p>The HdfsBolt will call the <code>mark()</code> method for every * tuple received. If the <code>mark()</code> method returns * <code>true</code> the HdfsBolt will perform a file rotation. * - * After file rotation, the HdfsBolt will call the <code>reset()</code> + * <p>After file rotation, the HdfsBolt will call the <code>reset()</code> * method. */ public interface FileRotationPolicy extends Serializable { @@ -39,12 +38,11 @@ public interface FileRotationPolicy extends Serializable { /** * Called after the HdfsBolt rotates a file. - * */ void reset(); /** - * Must be able to copy the rotation policy + * Must be able to copy the rotation policy. */ FileRotationPolicy copy(); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java index 230e8b4..24a46dd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.bolt.rotation; - import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,7 +20,7 @@ import org.slf4j.LoggerFactory; * File rotation policy that will rotate files when a certain * file size is reached. * - * For example: + * <p>For example: * <pre> * // rotate when files reach 5MB * FileSizeRotationPolicy policy = @@ -34,6 +33,7 @@ public class FileSizeRotationPolicy implements FileRotationPolicy { private long maxBytes; private long lastOffset = 0; private long currentBytesWritten = 0; + public FileSizeRotationPolicy(float count, Units units) { this.maxBytes = (long) (count * units.getByteCount()); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java index 45abc7d..a048dc3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.bolt.sync; - import org.apache.storm.tuple.Tuple; /** diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java index 6c2b0e0..caf6b49 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -18,9 +18,10 @@ import org.apache.storm.hdfs.bolt.Writer; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; -abstract public class AbstractHDFSWriter implements Writer { - final protected Path filePath; - final protected FileRotationPolicy rotationPolicy; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") +public abstract class AbstractHDFSWriter implements Writer { + protected final Path filePath; + protected final FileRotationPolicy rotationPolicy; protected long lastUsedTime; protected long offset; protected boolean needsRotation; @@ -32,7 +33,7 @@ abstract public class AbstractHDFSWriter implements Writer { } @Override - final public long write(Tuple tuple) throws IOException { + public final long write(Tuple tuple) throws IOException { doWrite(tuple); this.needsRotation = rotationPolicy.mark(tuple, offset); @@ -40,12 +41,12 @@ abstract public class AbstractHDFSWriter implements Writer { } @Override - final public void sync() throws IOException { + public final void sync() throws IOException { doSync(); } @Override - final public void close() throws IOException { + public final void close() throws IOException { doClose(); } @@ -59,10 +60,10 @@ abstract public class AbstractHDFSWriter implements Writer { return this.filePath; } - abstract protected void doWrite(Tuple tuple) throws IOException; + protected abstract void doWrite(Tuple tuple) throws IOException; - abstract protected void doSync() throws IOException; + protected abstract void doSync() throws IOException; - abstract protected void doClose() throws IOException; + protected abstract void doClose() throws IOException; } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java index d77423c..713aa58 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.common; - import java.io.IOException; import java.util.EnumSet; import org.apache.avro.Schema; @@ -28,6 +27,7 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter { private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java index 578bc06..8b3dcd1 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java @@ -23,6 +23,7 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class HDFSWriter extends AbstractHDFSWriter { private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java index 9f79373..1488655 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java @@ -21,10 +21,9 @@ public interface Partitioner extends Serializable { * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write * to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo" * - * A best practice is to use Path.SEPARATOR instead of a literal "/" + * <p>A best practice is to use Path.SEPARATOR instead of a literal "/" * * @param tuple The tuple for which the relative path is being calculated. - * @return */ public String getPartitionPath(final Tuple tuple); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java index f5ade03..bad3d06 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java @@ -18,7 +18,6 @@ package org.apache.storm.hdfs.common.rotation; - import java.io.IOException; import java.io.Serializable; import org.apache.hadoop.fs.FileSystem; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java index f94b8e5..9859d08 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -24,59 +24,68 @@ import org.apache.storm.validation.Validated; public class Configs implements Validated { /** - * @deprecated please use {@link HdfsSpout.setReaderType(String)} + * Required - chose the file type being consumed. + * @deprecated please use {@link HdfsSpout#setReaderType(String)} */ @Deprecated @isString @CustomValidator(validatorClass = ReaderTypeValidator.class) - public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed + public static final String READER_TYPE = "hdfsspout.reader.type"; public static final String TEXT = "text"; public static final String SEQ = "seq"; /** + * Required - HDFS name node. * @deprecated please use {@link HdfsSpout#setHdfsUri(String)} */ @Deprecated @isString - public static final String HDFS_URI = "hdfsspout.hdfs"; // Required - HDFS name node + public static final String HDFS_URI = "hdfsspout.hdfs"; /** + * Required - dir from which to read files. * @deprecated please use {@link HdfsSpout#setSourceDir(String)} */ @Deprecated @isString - public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files + public static final String SOURCE_DIR = "hdfsspout.source.dir"; /** + * Required - completed files will be moved here. * @deprecated please use {@link HdfsSpout#setArchiveDir(String)} */ @Deprecated @isString - public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here + public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; /** + * Required - unparsable files will be moved here. * @deprecated please use {@link HdfsSpout#setBadFilesDir(String)} */ @Deprecated @isString - public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here + public static final String BAD_DIR = "hdfsspout.badfiles.dir"; /** + * Directory in which lock files will be created. * @deprecated please use {@link HdfsSpout#setLockDir(String)} */ @Deprecated @isString - public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created + public static final String LOCK_DIR = "hdfsspout.lock.dir"; /** + * Commit after N records. 0 disables this. * @deprecated please use {@link HdfsSpout#setCommitFrequencyCount(int)} */ @Deprecated @isInteger @isPositiveNumber(includeZero = true) - public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this. + public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; /** + * Commit after N secs. cannot be disabled. * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)} */ @Deprecated @isInteger @isPositiveNumber - public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled. + public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; /** + * Max outstanding. * @deprecated please use {@link HdfsSpout#setMaxOutstanding(int)} */ @Deprecated @@ -84,6 +93,7 @@ public class Configs implements Validated { @isPositiveNumber(includeZero = true) public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding"; /** + * Lock timeout. * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)} */ @Deprecated @@ -91,21 +101,26 @@ public class Configs implements Validated { @isPositiveNumber public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; /** + * If clocks on machines in the Storm cluster are in sync inactivity duration after which locks are considered + * candidates for being reassigned to another spout. + * * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)} */ @Deprecated @isBoolean - public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync - // inactivity duration after which locks are considered candidates for being reassigned to another spout + public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; /** + * Ignore suffix. * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)} */ @Deprecated @isString public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; + /** + * Filenames with this suffix in archive dir will be ignored by the Spout. + */ @NotConf public static final String DEFAULT_LOCK_DIR = ".lock"; - // filenames with this suffix in archive dir will be ignored by the Spout public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000; public static final int DEFAULT_COMMIT_FREQ_SEC = 10; public static final int DEFAULT_MAX_OUTSTANDING = 10000; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index eea23e1..488531a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -39,9 +39,8 @@ public class DirLock { this.lockFile = lockFile; } - /** Get a lock on file if not already locked + /** Get a lock on file if not already locked. * - * @param fs * @param dir the dir on which to get a lock * @return The lock object if it the lock was acquired. Returns null if the dir is already locked. * @throws IOException if there were errors @@ -74,7 +73,9 @@ public class DirLock { + Thread.currentThread().getName(); } - /** if the lock on the directory is stale, take ownership */ + /** + * if the lock on the directory is stale, take ownership. + */ public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) { Path dirLockFile = getDirLockFile(dirToLock); @@ -95,8 +96,8 @@ public class DirLock { private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException { if (fs instanceof DistributedFileSystem) { if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) { - LOG.warn("Unable to recover lease on dir lock file " + dirLockFile + - " right now. Cannot transfer ownership. Will need to try later."); + LOG.warn("Unable to recover lease on dir lock file " + dirLockFile + + " right now. Cannot transfer ownership. Will need to try later."); return null; } } @@ -112,7 +113,9 @@ public class DirLock { return null; } - /** Release lock on dir by deleting the lock file */ + /** + * Release lock on dir by deleting the lock file. + */ public void release() throws IOException { if (!fs.delete(lockFile, false)) { LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index c6529a9..7ff5aaa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.spout; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -38,7 +37,7 @@ public class FileLock { private static final Logger LOG = LoggerFactory.getLogger(FileLock.class); private final FileSystem fs; - private final String componentID; + private final String componentId; private final Path lockFile; private final FSDataOutputStream lockFileStream; private LogEntry lastEntry; @@ -48,7 +47,7 @@ public class FileLock { this.fs = fs; this.lockFile = lockFile; this.lockFileStream = lockFileStream; - this.componentID = spoutId; + this.componentId = spoutId; logProgress("0", false); } @@ -57,12 +56,14 @@ public class FileLock { this.fs = fs; this.lockFile = lockFile; this.lockFileStream = fs.append(lockFile); - this.componentID = spoutId; + this.componentId = spoutId; LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId); logProgress(entry.fileOffset, true); } - /** returns lock on file or null if file is already locked. throws if unexpected problem */ + /** + * returns lock on file or null if file is already locked. throws if unexpected problem + */ public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) throws IOException { Path lockFile = new Path(lockDirPath, fileToLock.getName()); @@ -86,11 +87,9 @@ public class FileLock { * checks if lockFile is older than 'olderThan' UTC time by examining the modification time * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then * returns the last log entry, else returns null. - * @param fs - * @param lockFile + * * @param olderThan time (millis) in UTC. * @return the last entry in the file if its too old. null if last entry is not too old - * @throws IOException */ public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) throws IOException { @@ -117,11 +116,7 @@ public class FileLock { } /** - * returns the last log entry - * @param fs - * @param lockFile - * @return - * @throws IOException + * returns the last log entry. */ public static LogEntry getLastEntry(FileSystem fs, Path lockFile) throws IOException { @@ -136,14 +131,13 @@ public class FileLock { /** * Takes ownership of the lock file if possible. - * @param lockFile * @param lastEntry last entry in the lock file. this param is an optimization. * we dont scan the lock file again to find its last entry here since * its already been done once by the logic used to check if the lock * file is stale. so this value comes from that earlier scan. * @param spoutId spout id - * @throws IOException if unable to acquire * @return null if lock File is not recoverable + * @throws IOException if unable to acquire */ public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) throws IOException { @@ -158,11 +152,12 @@ public class FileLock { } return new FileLock(fs, lockFile, spoutId, lastEntry); } catch (IOException e) { - if (e instanceof RemoteException && - ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { + if (e instanceof RemoteException + && ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { LOG.warn( - "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, - e); + "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + + spoutId, + e); return null; } else { // unexpected error LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e); @@ -173,13 +168,9 @@ public class FileLock { /** * Finds a oldest expired lock file (using modification timestamp), then takes - * ownership of the lock file + * ownership of the lock file. * Impt: Assumes access to lockFilesDir has been externally synchronized such that * only one thread accessing the same thread - * @param fs - * @param lockFilesDir - * @param locktimeoutSec - * @return */ public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) throws IOException { @@ -209,14 +200,11 @@ public class FileLock { /** * Finds oldest expired lock file (using modification timestamp), then takes - * ownership of the lock file + * ownership of the lock file. * Impt: Assumes access to lockFilesDir has been externally synchronized such that * only one thread accessing the same thread - * @param fs - * @param lockFilesDir - * @param locktimeoutSec - * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found - * @throws IOException + * + * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found */ public static HdfsUtils.Pair<Path, LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec) throws IOException { @@ -248,7 +236,7 @@ public class FileLock { private void logProgress(String fileOffset, boolean prefixNewLine) throws IOException { long now = System.currentTimeMillis(); - LogEntry entry = new LogEntry(now, componentID, fileOffset); + LogEntry entry = new LogEntry(now, componentId, fileOffset); String line = entry.toString(); if (prefixNewLine) { lockFileStream.writeBytes(System.lineSeparator() + line); @@ -260,16 +248,17 @@ public class FileLock { lastEntry = entry; // update this only after writing to hdfs } - /** Release lock by deleting file + /** + * Release lock by deleting file. * @throws IOException if lock file could not be deleted */ public void release() throws IOException { lockFileStream.close(); if (!fs.delete(lockFile, false)) { - LOG.warn("Unable to delete lock file, Spout = {}", componentID); + LOG.warn("Unable to delete lock file, Spout = {}", componentId); throw new IOException("Unable to delete lock file"); } - LOG.debug("Released lock file {}. Spout {}", lockFile, componentID); + LOG.debug("Released lock file {}. Spout {}", lockFile, componentId); } // For testing only.. invoked via reflection @@ -288,12 +277,12 @@ public class FileLock { public static class LogEntry { private static final int NUM_FIELDS = 3; public final long eventTime; - public final String componentID; + public final String componentId; public final String fileOffset; - public LogEntry(long eventtime, String componentID, String fileOffset) { + public LogEntry(long eventtime, String componentId, String fileOffset) { this.eventTime = eventtime; - this.componentID = componentID; + this.componentId = componentId; this.fileOffset = fileOffset; } @@ -304,7 +293,7 @@ public class FileLock { @Override public String toString() { - return eventTime + "," + componentID + "," + fileOffset; + return eventTime + "," + componentId + "," + fileOffset; } @Override @@ -321,7 +310,7 @@ public class FileLock { if (eventTime != logEntry.eventTime) { return false; } - if (!componentID.equals(logEntry.componentID)) { + if (!componentId.equals(logEntry.componentId)) { return false; } return fileOffset.equals(logEntry.fileOffset); @@ -331,7 +320,7 @@ public class FileLock { @Override public int hashCode() { int result = (int) (eventTime ^ (eventTime >>> 32)); - result = 31 * result + componentID.hashCode(); + result = 31 * result + componentId.hashCode(); result = 31 * result + fileOffset.hashCode(); return result; } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java index bf58815..a8b354a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java @@ -24,7 +24,9 @@ package org.apache.storm.hdfs.spout; */ interface FileOffset extends Comparable<FileOffset>, Cloneable { - /** tests if rhs == currOffset+1 */ + /** + * tests if rhs == currOffset+1. + */ boolean isNextOffset(FileOffset rhs); FileOffset clone(); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java index 49d998a..b6e08f4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java @@ -26,10 +26,9 @@ interface FileReader { FileOffset getFileOffset(); /** - * Get the next tuple from the file + * Get the next tuple from the file. * * @return null if no more data - * @throws IOException */ List<Object> next() throws IOException, ParseException; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 18c469f..a7ce729 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -62,7 +62,7 @@ public class HdfsSpout extends BaseRichSpout { private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; private boolean clocksInSync = true; - private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts + private String inprogressSuffix = ".inprogress"; // not configurable to prevent change between topology restarts private String ignoreSuffix = ".ignore"; private String outputStreamName = null; private ProgressTracker tracker = null; @@ -89,14 +89,14 @@ public class HdfsSpout extends BaseRichSpout { return reader.getFilePath() + " " + reader.getFileOffset(); } - private static void releaseLockAndLog(FileLock fLock, String spoutId) { + private static void releaseLockAndLog(FileLock fileLock, String spoutId) { try { - if (fLock != null) { - fLock.release(); - LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); + if (fileLock != null) { + fileLock.release(); + LOG.debug("Spout {} released FileLock. SpoutId = {}", fileLock.getLockFile(), spoutId); } } catch (IOException e) { - LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e); + LOG.error("Unable to delete lock file : " + fileLock.getLockFile() + " SpoutId =" + spoutId, e); } } @@ -215,7 +215,7 @@ public class HdfsSpout extends BaseRichSpout { } /** - * Set output stream name + * Set output stream name. */ public HdfsSpout withOutputStream(String streamName) { this.outputStreamName = streamName; @@ -348,7 +348,7 @@ public class HdfsSpout extends BaseRichSpout { private void markFileAsBad(Path file) { String fileName = file.toString(); - String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix)); String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + originalName); @@ -590,7 +590,7 @@ public class HdfsSpout extends BaseRichSpout { Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0); for (Path file : listing) { - if (file.getName().endsWith(inprogress_suffix)) { + if (file.getName().endsWith(inprogressSuffix)) { continue; } if (file.getName().endsWith(ignoreSuffix)) { @@ -625,7 +625,6 @@ public class HdfsSpout extends BaseRichSpout { * check if the lock is updated. if not updated then acquires the lock * * @return a lock object - * @throws IOException */ private FileLock getOldestExpiredLock() throws IOException { // 1 - acquire lock on dir @@ -681,11 +680,9 @@ public class HdfsSpout extends BaseRichSpout { } /** - * Creates a reader that reads from beginning of file + * Creates a reader that reads from beginning of file. * * @param file file to read - * @return - * @throws IOException */ private FileReader createFileReader(Path file) throws IOException { @@ -706,12 +703,10 @@ public class HdfsSpout extends BaseRichSpout { } /** - * Creates a reader that starts reading from 'offset' + * Creates a reader that starts reading from 'offset'. * * @param file the file to read * @param offset the offset string should be understandable by the reader type being used - * @return - * @throws IOException */ private FileReader createFileReader(Path file, String offset) throws IOException { @@ -733,14 +728,14 @@ public class HdfsSpout extends BaseRichSpout { } /** - * Renames files with .inprogress suffix + * Renames files with .inprogress suffix. * * @return path of renamed file * @throws if operation fails */ private Path renameToInProgressFile(Path file) throws IOException { - Path newFile = new Path(file.toString() + inprogress_suffix); + Path newFile = new Path(file.toString() + inprogressSuffix); try { if (hdfs.rename(file, newFile)) { return newFile; @@ -757,7 +752,7 @@ public class HdfsSpout extends BaseRichSpout { private Path getFileForLockFile(Path lockFile, Path sourceDirPath) throws IOException { String lockFileName = lockFile.getName(); - Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix); + Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogressSuffix); if (hdfs.exists(dataFile)) { return dataFile; } @@ -771,7 +766,7 @@ public class HdfsSpout extends BaseRichSpout { // renames files and returns the new file path private Path renameCompletedFile(Path file) throws IOException { String fileName = file.toString(); - String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix)); String newName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path(archiveDirPath + Path.SEPARATOR + newName); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 39dccae..d245df7 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -25,8 +25,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SequenceFileReader<Key extends Writable, Value extends Writable> - extends AbstractFileReader { +public class SequenceFileReader<KeyT extends Writable, ValueT extends Writable> extends AbstractFileReader { public static final String[] defaultFields = { "key", "value" }; public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; private static final Logger LOG = LoggerFactory @@ -37,8 +36,8 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable> private final SequenceFileReader.Offset offset; - private final Key key; - private final Value value; + private final KeyT key; + private final ValueT value; public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf) @@ -46,8 +45,8 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable> super(fs, file); int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString()); this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize)); - this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); - this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); + this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); + this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); this.offset = new SequenceFileReader.Offset(0, 0, 0); } @@ -57,8 +56,8 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable> int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString()); this.offset = new SequenceFileReader.Offset(offset); this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize)); - this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); - this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); + this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); + this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); skipToOffset(this.reader, this.offset, this.key); } @@ -106,8 +105,11 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable> this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0); } - public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord - , long currRecordEndOffset, long prevRecordEndOffset) { + public Offset(long lastSyncPoint, + long recordsSinceLastSync, + long currentRecord, + long currRecordEndOffset, + long prevRecordEndOffset) { this.lastSyncPoint = lastSyncPoint; this.recordsSinceLastSync = recordsSinceLastSync; this.currentRecord = currentRecord; @@ -135,19 +137,19 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable> this.currRecordEndOffset = 0; } } catch (Exception e) { - throw new IllegalArgumentException("'" + offset + - "' cannot be interpreted. It is not in expected format for SequenceFileReader." + - " Format e.g. {sync=123:afterSync=345:record=67}"); + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for SequenceFileReader." + + " Format e.g. {sync=123:afterSync=345:record=67}"); } } @Override public String toString() { - return '{' + - "sync=" + lastSyncPoint + - ":afterSync=" + recordsSinceLastSync + - ":record=" + currentRecord + - ":}"; + return '{' + + "sync=" + lastSyncPoint + + ":afterSync=" + recordsSinceLastSync + + ":record=" + currentRecord + + ":}"; } @Override diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index 94b40f3..0b3da9c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -31,7 +31,7 @@ public class TextFileReader extends AbstractFileReader { public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; private static final int DEFAULT_BUFF_SIZE = 4096; - private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); + private static final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); private BufferedReader reader; private TextFileReader.Offset offset; @@ -124,26 +124,26 @@ public class TextFileReader extends AbstractFileReader { this.lineNumber = Long.parseLong(parts[1].split("=")[1]); } } catch (Exception e) { - throw new IllegalArgumentException("'" + offset + - "' cannot be interpreted. It is not in expected format for TextFileReader." + - " Format e.g. {char=123:line=5}"); + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for TextFileReader." + + " Format e.g. {char=123:line=5}"); } } @Override public String toString() { - return '{' + - "char=" + charOffset + - ":line=" + lineNumber + - ":}"; + return '{' + + "char=" + charOffset + + ":line=" + lineNumber + + ":}"; } @Override public boolean isNextOffset(FileOffset rhs) { if (rhs instanceof Offset) { Offset other = ((Offset) rhs); - return other.charOffset > charOffset && - other.lineNumber == lineNumber + 1; + return other.charOffset > charOffset + && other.lineNumber == lineNumber + 1; } return false; } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java index e6adfbb..118a113 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java @@ -101,9 +101,7 @@ public class HdfsState implements State { * Reads the last txn record from index file if it exists, if not from .tmp file if exists. * * @param indexFilePath the index file path - * @return the txn record from the index file or a default initial record. - * - * @throws IOException + * @return the txn record from the index file or a default initial record */ private TxnRecord getTxnRecord(Path indexFilePath) throws IOException { Path tmpPath = tmpFilePath(indexFilePath.toString()); @@ -186,13 +184,13 @@ public class HdfsState implements State { } /** - * for unit tests + * for unit tests. */ void close() throws IOException { this.options.closeOutputFile(); } - public static abstract class Options implements Serializable { + public abstract static class Options implements Serializable { protected String fsUrl; protected String configKey; @@ -216,10 +214,11 @@ public class HdfsState implements State { abstract void doCommit(Long txId) throws IOException; - abstract void doRecover(Path srcPath, long nBytes) throws Exception; + abstract void doRecover(Path srcPath, long numberOfBytes) throws Exception; protected void rotateOutputFile(boolean doRotateAction) throws IOException { LOG.info("Rotating output file..."); + @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") long start = System.currentTimeMillis(); closeOutputFile(); this.rotation++; @@ -279,14 +278,14 @@ public class HdfsState implements State { /** * Recovers nBytes from srcFile to the new file created by calling rotateOutputFile and then deletes the srcFile. */ - private void recover(String srcFile, long nBytes) { + private void recover(String srcFile, long numberOfBytes) { try { Path srcPath = new Path(srcFile); rotateOutputFile(false); this.rotationPolicy.reset(); - if (nBytes > 0) { - doRecover(srcPath, nBytes); - LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile); + if (numberOfBytes > 0) { + doRecover(srcPath, numberOfBytes); + LOG.info("Recovered {} bytes from {} to {}", numberOfBytes, srcFile, currentFile); } else { LOG.info("Nothing to recover from {}", srcFile); } @@ -380,11 +379,11 @@ public class HdfsState implements State { } @Override - void doRecover(Path srcPath, long nBytes) throws IOException { + void doRecover(Path srcPath, long numberOfBytes) throws IOException { this.offset = 0; FSDataInputStream is = this.fs.open(srcPath); - copyBytes(is, out, nBytes); - this.offset = nBytes; + copyBytes(is, out, numberOfBytes); + this.offset = numberOfBytes; } private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException { @@ -468,7 +467,9 @@ public class HdfsState implements State { @Override void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException { LOG.info("Preparing Sequence File State..."); - if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified."); + if (this.format == null) { + throw new IllegalStateException("SequenceFormat must be specified."); + } this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); this.codecFactory = new CompressionCodecFactory(hdfsConfig); @@ -491,9 +492,10 @@ public class HdfsState implements State { @Override - void doRecover(Path srcPath, long nBytes) throws Exception { + void doRecover(Path srcPath, long numberOfBytes) throws Exception { SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig, - SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes)); + SequenceFile.Reader.file(srcPath), + SequenceFile.Reader.length(numberOfBytes)); Writable key = (Writable) this.format.keyClass().newInstance(); Writable value = (Writable) this.format.valueClass().newInstance(); @@ -531,10 +533,9 @@ public class HdfsState implements State { } /** - * TxnRecord [txnid, data_file_path, data_file_offset] - * <p> - * This is written to the index file during beginCommit() and used for recovery. - * </p> + * TxnRecord [txnid, data_file_path, data_file_offset]. + * + * <p>This is written to the index file during beginCommit() and used for recovery. */ private static class TxnRecord { private long txnid; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java index e97bf1c..e48e198 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java @@ -25,7 +25,7 @@ import java.util.Map; * MyBolt-5-7-1390579837830.txt * </pre> * - * By default, prefix is empty and extenstion is ".txt". + * <p>By default, prefix is empty and extenstion is ".txt". * */ public class DefaultFileNameFormat implements FileNameFormat { @@ -36,9 +36,6 @@ public class DefaultFileNameFormat implements FileNameFormat { /** * Overrides the default prefix. - * - * @param prefix - * @return */ public DefaultFileNameFormat withPrefix(String prefix) { this.prefix = prefix; @@ -47,9 +44,6 @@ public class DefaultFileNameFormat implements FileNameFormat { /** * Overrides the default file extension. - * - * @param extension - * @return */ public DefaultFileNameFormat withExtension(String extension) { this.extension = extension; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java index e21fede..c12b478 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java @@ -19,8 +19,6 @@ import org.apache.storm.tuple.Fields; * RecordFormat implementation that uses field and record delimiters. * By default uses a comma (",") as the field delimiter and a * newline ("\n") as the record delimiter. - * - * */ public class DelimitedRecordFormat implements RecordFormat { public static final String DEFAULT_FIELD_DELIMITER = ","; @@ -31,9 +29,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Only output the specified fields. - * - * @param fields - * @return */ public DelimitedRecordFormat withFields(Fields fields) { this.fields = fields; @@ -42,9 +37,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Overrides the default field delimiter. - * - * @param delimiter - * @return */ public DelimitedRecordFormat withFieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; @@ -53,9 +45,6 @@ public class DelimitedRecordFormat implements RecordFormat { /** * Overrides the default record delimiter. - * - * @param delimiter - * @return */ public DelimitedRecordFormat withRecordDelimiter(String delimiter) { this.recordDelimiter = delimiter; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java index fbd8f5a..0b7ac46 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java @@ -17,7 +17,6 @@ import java.util.Map; /** * Formatter interface for determining HDFS file names. - * */ public interface FileNameFormat extends Serializable { @@ -27,7 +26,6 @@ public interface FileNameFormat extends Serializable { * Returns the filename the HdfsBolt will create. * @param rotation the current file rotation number (incremented on every rotation) * @param timeStamp current time in milliseconds when the rotation occurs - * @return */ String getName(long rotation, long timeStamp); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java index 1cc5363..b2f2cc3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java @@ -12,14 +12,11 @@ package org.apache.storm.hdfs.trident.format; - import java.io.Serializable; import org.apache.storm.trident.tuple.TridentTuple; /** - * Formats a Tuple object into a byte array - * that will be written to HDFS. - * + * Formats a Tuple object into a byte array that will be written to HDFS. */ public interface RecordFormat extends Serializable { byte[] format(TridentTuple tuple); diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java index 497d045..815bf2f 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java @@ -27,32 +27,22 @@ import org.apache.storm.trident.tuple.TridentTuple; */ public interface SequenceFormat extends Serializable { /** - * Key class used by implementation (e.g. IntWritable.class, etc.) - * - * @return + * Key class used by implementation (e.g. IntWritable.class, etc.). */ Class keyClass(); /** - * Value class used by implementation (e.g. Text.class, etc.) - * - * @return + * Value class used by implementation (e.g. Text.class, etc.). */ Class valueClass(); /** * Given a tuple, return the key that should be written to the sequence file. - * - * @param tuple - * @return */ Writable key(TridentTuple tuple); /** * Given a tuple, return the value that should be written to the sequence file. - * - * @param tuple - * @return */ Writable value(TridentTuple tuple); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java index 068390f..889c60b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java @@ -68,9 +68,7 @@ public class SimpleFileNameFormat implements FileNameFormat { * $HOST - local host name<br/> * $PARTITION - partition index<br/> * - * @param name - * file name - * @return + * @param name file name */ public SimpleFileNameFormat withName(String name) { this.name = name; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java index ad9d7aa..a2a5932 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java @@ -18,11 +18,11 @@ import org.apache.storm.trident.tuple.TridentTuple; /** * Used by the HdfsBolt to decide when to rotate files. * - * The HdfsBolt will call the <code>mark()</code> method for every + * <p>The HdfsBolt will call the <code>mark()</code> method for every * tuple received. If the <code>mark()</code> method returns * <code>true</code> the HdfsBolt will perform a file rotation. * - * After file rotation, the HdfsBolt will call the <code>reset()</code> + * <p>After file rotation, the HdfsBolt will call the <code>reset()</code> * method. */ public interface FileRotationPolicy extends Serializable { diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java index 2a512c4..18790d8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.trident.rotation; - import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,19 +20,19 @@ import org.slf4j.LoggerFactory; * File rotation policy that will rotate files when a certain * file size is reached. * - * For example: + * <p>For example: * <pre> * // rotate when files reach 5MB * FileSizeRotationPolicy policy = * new FileSizeRotationPolicy(5.0, Units.MB); * </pre> - * */ public class FileSizeRotationPolicy implements FileRotationPolicy { private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class); private long maxBytes; private long lastOffset = 0; private long currentBytesWritten = 0; + public FileSizeRotationPolicy(float count, Units units) { this.maxBytes = (long) (count * units.getByteCount()); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java index 2508a07..4539464 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java @@ -23,12 +23,12 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.storm.trident.tuple.TridentTuple; - public class TimedRotationPolicy implements FileRotationPolicy { private long interval; private Timer rotationTimer; private AtomicBoolean rotationTimerTriggered = new AtomicBoolean(); + public TimedRotationPolicy(float count, TimeUnit units) { this.interval = (long) (count * units.getMilliSeconds()); } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java index 15f0c6b..f98dbdf 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java @@ -12,7 +12,6 @@ package org.apache.storm.hdfs.trident.sync; - import org.apache.storm.trident.tuple.TridentTuple; /** diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index 1a30bfa..469a808 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -245,7 +245,7 @@ public class TestFileLock { expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); Assert.assertNotNull(expired); - Assert.assertEquals("spout3", expired.getValue().componentID); + Assert.assertEquals("spout3", expired.getValue().componentId); } finally { lock1.release(); lock2.release();