[ https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397520#comment-15397520 ]
Cody Koeninger commented on SPARK-16746: ---------------------------------------- >From conversation on mailing list, it wasn't clear whether this was using the >official Kafka integration or dibbhatt from spark packages. If the latter, >should probably bring it up to him first. > Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs > timeout > ------------------------------------------------------------------------------- > > Key: SPARK-16746 > URL: https://issues.apache.org/jira/browse/SPARK-16746 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.1 > Reporter: Hongyao Zhao > Priority: Minor > > I wrote a spark streaming program which consume 1000 messages from one topic > of Kafka, did some transformation, and wrote the result back to another > topic. But only found 988 messages in the second topic. I checked log info > and confirmed all messages was received by receivers. But I found a hdfs > writing time out message printed from Class BatchedWriteAheadLog. > > I checkout source code and found code like this: > > {code:borderStyle=solid} > /** Add received block. This event will get written to the write ahead > log (if enabled). */ > def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { > try { > val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) > if (writeResult) { > synchronized { > getReceivedBlockQueue(receivedBlockInfo.streamId) += > receivedBlockInfo > } > logDebug(s"Stream ${receivedBlockInfo.streamId} received " + > s"block ${receivedBlockInfo.blockStoreResult.blockId}") > } else { > logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} > receiving " + > s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write > Ahead Log.") > } > writeResult > } catch { > case NonFatal(e) => > logError(s"Error adding block $receivedBlockInfo", e) > false > } > } > {code} > > It seems that ReceiverTracker tries to write block info to hdfs, but the > write operation time out, this cause writeToLog function return false, and > this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += > receivedBlockInfo" is skipped. so the block info is lost. > The spark version I use is 1.6.1 and I did not turn on > spark.streaming.receiver.writeAheadLog.enable. > > I want to know whether or not this is a designed behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org