[ https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14585389#comment-14585389 ]
Apache Spark commented on SPARK-8367: ------------------------------------- User 'SaintBacchus' has created a pull request for this issue: https://github.com/apache/spark/pull/6818 > ReliableKafka will loss data when `spark.streaming.blockInterval` was 0 > ----------------------------------------------------------------------- > > Key: SPARK-8367 > URL: https://issues.apache.org/jira/browse/SPARK-8367 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.4.0 > Reporter: SaintBacchus > > {code:title=BlockGenerator.scala|borderStyle=solid} > /** Change the buffer to which single records are added to. */ > private def updateCurrentBuffer(time: Long): Unit = synchronized { > try { > val newBlockBuffer = currentBuffer > currentBuffer = new ArrayBuffer[Any] > if (newBlockBuffer.size > 0) { > val blockId = StreamBlockId(receiverId, time - blockIntervalMs) > val newBlock = new Block(blockId, newBlockBuffer) > listener.onGenerateBlock(blockId) > blocksForPushing.put(newBlock) // put is blocking when queue is full > logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) > } > } catch { > case ie: InterruptedException => > logInfo("Block updating timer thread was interrupted") > case e: Exception => > reportError("Error in block updating thread", e) > } > } > {code} > If *spark.streaming.blockInterval* was 0, the *blockId* in the code will > always be the same because of *time* was 0 and *blockIntervalMs* was 0 too. > {code:title=ReliableKafkaReceiver.scala|borderStyle=solid} > private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { > // Get a snapshot of current offset map and store with related block id. > val offsetSnapshot = topicPartitionOffsetMap.toMap > blockOffsetMap.put(blockId, offsetSnapshot) > topicPartitionOffsetMap.clear() > } > {code} > If the *blockId* was the same, Streaming will commit the *offset* before > the really data comsumed(data was waitting to be commit but the offset had > updated and commit by previous commit) > So when exception occures, the *offset* had commit but the data will loss > since the data was in memory and not comsumed yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org