Re: Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Cody Koeninger
Can you go ahead and open a Jira ticket with that explanation?

Is there a reason you need to use receivers instead of the direct stream?

On Tue, Jul 26, 2016 at 4:45 AM, Andy Zhao <andyrao1...@gmail.com> wrote:
> Hi guys,
>
> I wrote a spark streaming program which consume 1000 messages from one
> topic of Kafka, did some transformation, and wrote the result back to
> another topic. But only found 988 messages in the second topic. I checked
> log info and confirmed all messages was received by receivers. But I found a
> hdfs writing time out message printed from Class BatchedWriteAheadLog.
>
> I checkout source code and found code like this:
>
> /** Add received block. This event will get written to the write ahead
> log (if enabled). */
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
> try {
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
>   if (writeResult) {
> synchronized {
>   getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo
> }
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}")
>   } else {
> logDebug(s"Failed to acknowledge stream
> ${receivedBlockInfo.streamId} receiving " +
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
> Ahead Log.")
>   }
>   writeResult
> } catch {
>   case NonFatal(e) =>
> logError(s"Error adding block $receivedBlockInfo", e)
> false
> }
>   }
>
>
> It seems that ReceiverTracker tries to write block info to hdfs, but the
> write operation time out, this cause writeToLog function return false, and
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo" is skipped. so the block info is lost.
>
>The spark version I use is 1.6.1 and I did not turn on
> spark.streaming.receiver.writeAheadLog.enable.
>
>    I want to know whether or not this is a designed behaviour.
>
> Thanks
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-lost-data-when-ReceiverTracker-writes-Blockinfo-to-hdfs-timeout-tp27410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Andy Zhao
Hi guys, 

I wrote a spark streaming program which consume 1000 messages from one
topic of Kafka, did some transformation, and wrote the result back to
another topic. But only found 988 messages in the second topic. I checked
log info and confirmed all messages was received by receivers. But I found a
hdfs writing time out message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
/** Add received block. This event will get written to the write ahead
log (if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) +=
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream
${receivedBlockInfo.streamId} receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 


It seems that ReceiverTracker tries to write block info to hdfs, but the
write operation time out, this cause writeToLog function return false, and 
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

Thanks
  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-lost-data-when-ReceiverTracker-writes-Blockinfo-to-hdfs-timeout-tp27410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org