Re: Issues in recovering state from last crash using custom sink

2017-08-29 Thread Aljoscha Krettek
Hi,

If you are not manually doing a savepoint and then restoring from that 
savepoint you will not restore state. Simply stopping a job and then restarting 
will not restore state. The regular checkpoints are only used for recovery if a 
job fails, not for a user-induced shutdown.

Best,
Aljoscha
> On 28. Aug 2017, at 20:14, vipul singh  wrote:
> 
> Hi Aljoscha,
> 
> Yes. 
> I am running the application till a few checkpoints are complete. I am 
> stopping the application between two checkpoints, so there will be messages 
> in the list state, which should be checkpointed when snapshot is called. I am 
> able to see a checkpoint file on S3( I am saving the checkpoints on s3 using 
> rockstatedb). On restarting the application, I add a debug point here 
> ,
>  to see if there are any messages in checkpointedMessages, but as shown 
> below, the list is empty.
> 
> 
> ​ 
> Do you think there might be an error in the way I am trying to retrieve 
> messages?
> 
> 
>   def snapshotState(context: FunctionSnapshotContext) {
> checkpointedMessages.clear()
> bufferredMessages.foreach(checkpointedMessages.add)
> 
> pendingFiles synchronized {
>   if (pendingFiles.nonEmpty) {
> // we have a list of pending files
> // we move all times to S3( thats the sink in our case)
> // and post that we delete these files
>   }
>   pendingFiles.clear()
> }
>   }
> 
>   def initializeState(context: FunctionInitializationContext) {
> 
> // Check is files alreay exist in /tmp
> // this might be the case the program crashed before these files were 
> uploaded to s3
> // We need to recover(upload these files to S3 and clear the directory
> handlePreviousPendingFiles()
> 
> checkpointedMessages = context.getOperatorStateStore.getListState(new 
> ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new 
> TypeHint[Message]() {})))
> import scala.collection.JavaConversions._
> for (message <- checkpointedMessages.get) {
>   bufferredMessages.add(message)
> }
>   }
> 
> From my understanding in the above code, upon checkpointing, messages 
> contained in checkpointedMessages are in the snapshot, and on initializeState 
> being called, it will try to recover these messages from last checkpoint? 
> Do you think the error is in the way I am trying to get the last checkpoint 
> ListBuffer elements?
> checkpointedMessages = context.getOperatorStateStore.getListState(new 
> ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new 
> TypeHint[Message]() {})))
> 
> Please let me know!
> 
> Thanks,
> Vipul
> 
> On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> How are you testing the recovery behaviour? Are you taking a savepoint ,then 
> shutting down, and then restarting the Job from the savepoint?
> 
> Best,
> Aljoscha
> 
>> On 28. Aug 2017, at 00:28, vipul singh > > wrote:
>> 
>> Hi all,
>> 
>> I am working on a flink archiver application. In a gist this application 
>> tries to reads a bunch of schematized messages from kafka and archives them 
>> to s3. Due to the nature of the naming of the files, I had to go towards a 
>> custom sink implementation. As of the current progress the application in 
>> general is able to archive files to s3 ok.
>> I am having some issues during the recovery phase. A sample of the code can 
>> be found on link 
>> . My 
>> issue is on recovery when initializeState is called, it is not able to 
>> get(recover) the last checkpointed ListState( i.e. checkpointedMessages 
>> 
>>  is 0). I think this might be because of the way I am retrieving the 
>> checkpointed messages. Could someone please point me to what is wrong? or 
>> direct me to some examples which do a similar thing( Please note Message 
>> 
>>  class is our own implementation)
>> 
>> Thanks,
>> Vipul
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul



Re: Issues in recovering state from last crash using custom sink

2017-08-28 Thread vipul singh
Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am
stopping the application between two checkpoints, so there will be messages
in the list state, which should be checkpointed when *snapshot* is called.
I am able to see a checkpoint file on S3( I am saving the checkpoints on s3
using rockstatedb). On restarting the application, I add a debug point here
,
to see if there are any messages in checkpointedMessages, but as shown
below, the list is empty.


​
Do you think there might be an error in the way I am trying to retrieve
messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

* def initializeState(context: FunctionInitializationContext) {*
* // Check is files alreay exist in /tmp*
* // this might be the case the program crashed before these files were
uploaded to s3*
* // We need to recover(upload these files to S3 and clear the directory*
* handlePreviousPendingFiles()*
* checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))*
* import scala.collection.JavaConversions._*
* for (message <- checkpointedMessages.get) {*
* bufferredMessages.add(message)*
* }*
* }* From my understanding in the above code, upon checkpointing, messages
contained in checkpointedMessages are in the snapshot, and on
*initializeState* being called, it will try to recover these messages from
last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint
ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> How are you testing the recovery behaviour? Are you taking a savepoint
> ,then shutting down, and then restarting the Job from the savepoint?
>
> Best,
> Aljoscha
>
> On 28. Aug 2017, at 00:28, vipul singh  wrote:
>
> Hi all,
>
> I am working on a flink archiver application. In a gist this application
> tries to reads a bunch of schematized messages from kafka and archives them
> to s3. Due to the nature of the naming of the files, I had to go towards a
> custom sink implementation. As of the current progress the application in
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code
> can be found on link
> . My
> issue is on recovery when initializeState is called, it is not able to
> get(recover) the last checkpointed ListState( i.e. checkpointedMessages
> 
>  is
> 0). I think this might be because of the way I am retrieving the
> checkpointed messages. Could someone please point me to what is wrong? or
> direct me to some examples which do a similar thing( Please note Message
> 
>  class
> is our own implementation)
>
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul


Re: Issues in recovering state from last crash using custom sink

2017-08-28 Thread Aljoscha Krettek
Hi,

How are you testing the recovery behaviour? Are you taking a savepoint ,then 
shutting down, and then restarting the Job from the savepoint?

Best,
Aljoscha

> On 28. Aug 2017, at 00:28, vipul singh  wrote:
> 
> Hi all,
> 
> I am working on a flink archiver application. In a gist this application 
> tries to reads a bunch of schematized messages from kafka and archives them 
> to s3. Due to the nature of the naming of the files, I had to go towards a 
> custom sink implementation. As of the current progress the application in 
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code can 
> be found on link 
> . My issue 
> is on recovery when initializeState is called, it is not able to get(recover) 
> the last checkpointed ListState( i.e. checkpointedMessages 
> 
>  is 0). I think this might be because of the way I am retrieving the 
> checkpointed messages. Could someone please point me to what is wrong? or 
> direct me to some examples which do a similar thing( Please note Message 
> 
>  class is our own implementation)
> 
> Thanks,
> Vipul



Issues in recovering state from last crash using custom sink

2017-08-27 Thread vipul singh
Hi all,

I am working on a flink archiver application. In a gist this application
tries to reads a bunch of schematized messages from kafka and archives them
to s3. Due to the nature of the naming of the files, I had to go towards a
custom sink implementation. As of the current progress the application in
general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can
be found on link
. My
issue is on recovery when initializeState is called, it is not able to
get(recover) the last checkpointed ListState( i.e. checkpointedMessages

is
0). I think this might be because of the way I am retrieving the
checkpointed messages. Could someone please point me to what is wrong? or
direct me to some examples which do a similar thing( Please note Message

class
is our own implementation)

Thanks,
Vipul