STORM-711: modifying all connectors to use collector.reportError instead of logging and use tuple anchoring when emitting a tuple.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffb1562c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffb1562c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffb1562c Branch: refs/heads/nimbus-ha-branch Commit: ffb1562cb0eb3fbd916423136fcd3c181fbf6212 Parents: 2666d99 Author: Parth Brahmbhatt <[email protected]> Authored: Wed Mar 18 10:15:53 2015 -0700 Committer: Parth Brahmbhatt <[email protected]> Committed: Wed Mar 18 10:49:27 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java | 2 +- .../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java | 4 ++-- .../java/org/apache/storm/hbase/trident/state/HBaseState.java | 2 +- .../src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java | 2 +- .../main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java index f7f0886..cf29aa5 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java @@ -62,7 +62,7 @@ public class HBaseBolt extends AbstractHBaseBolt { try { this.hBaseClient.batchMutate(mutations); } catch(Exception e){ - LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e); + this.collector.reportError(e); this.collector.fail(tuple); return; } http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java index c6838be..fd32f50 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java @@ -67,11 +67,11 @@ public class HBaseLookupBolt extends AbstractHBaseBolt { try { Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; for(Values values : rowToTupleMapper.toValues(tuple, result)) { - this.collector.emit(values); + this.collector.emit(tuple, values); } this.collector.ack(tuple); } catch (Exception e) { - LOG.warn("Could not perform Lookup for rowKey =" + rowKey + " from Hbase.", e); + this.collector.reportError(e); this.collector.fail(tuple); } } http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java index 71ab7c4..04518ca 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java @@ -141,7 +141,7 @@ public class HBaseState implements State { try { hBaseClient.batchMutate(mutations); } catch (Exception e) { - LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e); + collector.reportError(e); throw new FailedException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java index a416357..dcb09e7 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java @@ -112,7 +112,7 @@ public class HdfsBolt extends AbstractHdfsBolt{ this.rotationPolicy.reset(); } } catch (IOException e) { - LOG.warn("write/sync failed.", e); + this.collector.reportError(e); this.collector.fail(tuple); } } http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index fc9bb4f..baf4df0 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -123,7 +123,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt { this.rotationPolicy.reset(); } } catch (IOException e) { - LOG.warn("write/sync failed.", e); + this.collector.reportError(e); this.collector.fail(tuple); }
