[ https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15398615#comment-15398615 ]
Hongyao Zhao commented on SPARK-16746: -------------------------------------- I did some test yesterday, It seems that spark 1.6 direct api can consume messages from Kafka 0.9 brokers, so I can get around this problem by using direct api. It a good news to me, but I think what I mentioned in issue has nothing to do with what kind of receivers I use, because ReceiverTracker is a internal class in spark source code. > Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs > timeout > ------------------------------------------------------------------------------- > > Key: SPARK-16746 > URL: https://issues.apache.org/jira/browse/SPARK-16746 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.1 > Reporter: Hongyao Zhao > Priority: Minor > > I wrote a spark streaming program which consume 1000 messages from one topic > of Kafka, did some transformation, and wrote the result back to another > topic. But only found 988 messages in the second topic. I checked log info > and confirmed all messages was received by receivers. But I found a hdfs > writing time out message printed from Class BatchedWriteAheadLog. > > I checkout source code and found code like this: > > {code:borderStyle=solid} > /** Add received block. This event will get written to the write ahead > log (if enabled). */ > def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { > try { > val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) > if (writeResult) { > synchronized { > getReceivedBlockQueue(receivedBlockInfo.streamId) += > receivedBlockInfo > } > logDebug(s"Stream ${receivedBlockInfo.streamId} received " + > s"block ${receivedBlockInfo.blockStoreResult.blockId}") > } else { > logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} > receiving " + > s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write > Ahead Log.") > } > writeResult > } catch { > case NonFatal(e) => > logError(s"Error adding block $receivedBlockInfo", e) > false > } > } > {code} > > It seems that ReceiverTracker tries to write block info to hdfs, but the > write operation time out, this cause writeToLog function return false, and > this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += > receivedBlockInfo" is skipped. so the block info is lost. > The spark version I use is 1.6.1 and I did not turn on > spark.streaming.receiver.writeAheadLog.enable. > > I want to know whether or not this is a designed behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org