[
https://issues.apache.org/jira/browse/STORM-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699056#comment-14699056
]
ASF GitHub Bot commented on STORM-969:
--------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/664#discussion_r37160668
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---
@@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext
topologyContext, OutputCollector
@Override
public void execute(Tuple tuple) {
- try {
- byte[] bytes = this.format.format(tuple);
- synchronized (this.writeLock) {
- out.write(bytes);
- this.offset += bytes.length;
-
- if (this.syncPolicy.mark(tuple, this.offset)) {
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream)
this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
- this.syncPolicy.reset();
+ boolean forceRotate = false;
+ synchronized (this.writeLock) {
+ boolean forceSync = false;
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK! forcing a file system flush");
+ forceSync = true;
+ }
+ else {
+ try {
+ writeAndAddTuple(tuple);
+ } catch (IOException e) {
+ //If the write failed, try to sync anything already
written
+ LOG.info("Tuple failed to write, forcing a flush of
existing data.");
+ this.collector.reportError(e);
+ forceSync = true;
+ this.collector.fail(tuple);
}
}
- this.collector.ack(tuple);
+ if (this.syncPolicy.mark(tuple, this.offset) || forceSync) {
+ try {
+ syncAndAckTuples();
+ } catch (IOException e) {
+ LOG.warn("Data could not be synced to filesystem,
failing this batch of tuples");
+ this.collector.reportError(e);
+ //Force rotation to get a new file handle
+ forceRotate = true;
+ for (Tuple t : tupleBatch)
+ this.collector.fail(t);
+ tupleBatch.clear();
+ }
+ }
+ }
- if(this.rotationPolicy.mark(tuple, this.offset)){
- rotateOutputFile(); // synchronized
- this.offset = 0;
- this.rotationPolicy.reset();
+ if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
+ try {
+ rotateAndReset();
+ } catch (IOException e) {
+ this.collector.reportError(e);
+ LOG.warn("File could not be rotated");
+ //At this point there is nothing to do. In all likelihood
any filesystem operations will fail.
+ //The next tuple will almost certainly fail to write
and/or sync, which force a rotation. That
+ //will give rotateAndReset() a chance to work which
includes creating a fresh file handle.
}
- } catch (IOException e) {
- this.collector.reportError(e);
- this.collector.fail(tuple);
}
}
+ private void rotateAndReset() throws IOException {
+ rotateOutputFile(); // synchronized
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ }
+
+ private void syncAndAckTuples() throws IOException {
--- End diff --
An optimization could be to do to the sync only if some tuples were
actually written out since last sync.
> HDFS Bolt can end up in an unrecoverable state
> ----------------------------------------------
>
> Key: STORM-969
> URL: https://issues.apache.org/jira/browse/STORM-969
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-hdfs
> Reporter: Aaron Dossett
> Assignee: Aaron Dossett
>
> The body of the HDFSBolt.execute() method is essentially one try-catch block.
> The catch block reports the error and fails the current tuple. In some
> cases the bolt's FSDataOutputStream object (named 'out') is in an
> unrecoverable state and no subsequent calls to execute() can succeed.
> To produce this scenario:
> - process some tuples through HDFS bolt
> - put the underlying HDFS system into safemode
> - process some more tuples and receive a correct ClosedChannelException
> - take the underlying HDFS system out of safemode
> - subsequent tuples continue to fail with the same exception
> The three fundamental operations that execute takes (writing, sync'ing,
> rotating) need to be isolated so that errors from each are specifically
> handled.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)