Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2596#discussion_r176908208
--- Diff:
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
---
@@ -53,79 +78,197 @@ public static void main(String[] args) throws
Exception {
Fields esFields = new Fields("index", "type", "source");
EsTupleMapper tupleMapper =
EsTestUtil.generateDefaultTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
- TridentState state = stream.partitionPersist(factory, esFields,
new EsUpdater(), new Fields());
+ TridentState state = stream.partitionPersist(factory,
+ esFields,
+ new EsUpdater(),
+ new Fields());
EsTestUtil.startEsNode();
- EsTestUtil.waitForSeconds(5);
+ EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
- StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(),
topology.build());
+ StormSubmitter.submitTopology(TOPOLOGY_NAME,
+ new Config(),
+ topology.build());
}
+ /**
+ * A fixed batch spout.
+ */
public static class FixedBatchSpout implements IBatchSpout {
- int maxBatchSize;
- HashMap<Long, List<List<Object>>> batches = new HashMap<Long,
List<List<Object>>>();
+ private static final long serialVersionUID = 1L;
+ /**
+ * The maximum batch size.
+ */
+ private int maxBatchSize;
+ /**
+ * The passed batches.
+ */
+ private HashMap<Long, List<List<Object>>> batches = new
HashMap<>();
+ /**
+ * The output values.
+ */
private Values[] outputs = {
- new Values("{\"user\":\"user1\"}", "index1", "type1",
UUID.randomUUID().toString()),
- new Values("{\"user\":\"user2\"}", "index1", "type2",
UUID.randomUUID().toString()),
- new Values("{\"user\":\"user3\"}", "index2", "type1",
UUID.randomUUID().toString()),
- new Values("{\"user\":\"user4\"}", "index2", "type2",
UUID.randomUUID().toString())
+ new Values("{\"user\":\"user1\"}",
+ "index1",
+ "type1",
+ UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user2\"}",
+ "index1",
+ "type2",
+ UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user3\"}",
+ "index2",
+ "type1",
+ UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user4\"}",
+ "index2",
+ "type2",
+ UUID.randomUUID().toString())
};
+ /**
+ * The current index.
+ */
private int index = 0;
- boolean cycle = false;
+ /**
+ * A flag indicating whether cycling ought to be performed.
+ */
+ private boolean cycle = false;
- public FixedBatchSpout(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
+ /**
+ * Creates a new fixed batch spout.
+ * @param maxBatchSizeArg the maximum batch size to set
+ */
+ public FixedBatchSpout(final int maxBatchSizeArg) {
+ this.maxBatchSize = maxBatchSizeArg;
}
- public void setCycle(boolean cycle) {
- this.cycle = cycle;
+ /**
+ * Sets the cycle flag.
+ * @param cycleArg the cycle flag value
+ */
+ public void setCycle(final boolean cycleArg) {
+ this.cycle = cycleArg;
}
+ /**
+ * Gets the output fields.
+ * @return the output fields.
+ */
@Override
public Fields getOutputFields() {
return new Fields("source", "index", "type", "id");
}
+ /**
+ * Open the topology.
+ * @param conf the configuration to use for opening
+ * @param context the context to use for opening
+ */
@Override
- public void open(Map<String, Object> conf, TopologyContext
context) {
+ public void open(final Map<String, Object> conf,
+ final TopologyContext context) {
index = 0;
}
+ /**
+ * Emits a batch.
+ * @param batchId the batch id to use
+ * @param collector the collector to emit to
+ */
@Override
- public void emitBatch(long batchId, TridentCollector collector) {
- List<List<Object>> batch = this.batches.get(batchId);
+ public void emitBatch(final long batchId,
+ final TridentCollector collector) {
+ List<List<Object>> batch = this.getBatches().get(batchId);
if (batch == null) {
batch = new ArrayList<List<Object>>();
- if (index >= outputs.length && cycle) {
+ if (index >= outputs.length && isCycle()) {
index = 0;
}
- for (int i = 0; i < maxBatchSize; index++, i++) {
+ for (int i = 0; i < getMaxBatchSize(); index++, i++) {
if (index == outputs.length) {
index = 0;
}
batch.add(outputs[index]);
}
- this.batches.put(batchId, batch);
+ this.getBatches().put(batchId, batch);
}
for (List<Object> list : batch) {
collector.emit(list);
}
}
+ /**
+ * Acknoledges the message with id {@code msgId}.
--- End diff --
Acknowledges
---