delete all available flink timers on app start

2019-01-16 Thread vipul singh
Hello,

I have a custom app, in which when due to some exception, the app restarts
I want to cancel all registered flink timers in the initializeState method.
Based on the documentation I feel like all timer state is saved in the
state, so if the app restarts the timers are still active.

Is there a way to delete all available timers on app crash and restart?

-- 
Thanks,
Vipul


Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-28 Thread vipul singh
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in
snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi  wrote:

> Thanks Rafi,
> I am actually not using assignTimestampsAndWatermarks , I will try to add
> it as you suggested. however it seems that the messages I repeating in the
> stream over and over even if I am pushing single message manually to the
> queue, that message will repeat infinity
>
> Cheers
> Avi
>
>
> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch  wrote:
>
>> Hi Avi,
>>
>> I can't see the part where you use  assignTimestampsAndWatermarks.
>> If this part in not set properly, it's possible that watermarks are not
>> sent and nothing will be written to your Sink.
>>
>> See here for more details:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>
>> Hope this helps,
>> Rafi
>>
>> On Wed, Nov 28, 2018, 21:22 Avi Levi >
>>> Hi,
>>>
>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>> consists of kafka as source and parquet file as a sink however it seems
>>> like the stream is repeating itself like endless loop and the parquet file
>>> is not written . can someone please help me with this?
>>>
>>> object ParquetSinkWriter{
>>>   private val path = new Path("tmp/pfile")
>>>   private val schemaString =
>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>   private val avroSchema: Schema = new
>>> Schema.Parser().parse(schemaString)
>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>   private   val config = ParquetWriterConfig()
>>>   val writer: ParquetWriter[GenericRecord] =
>>> AvroParquetWriter.builder[GenericRecord](path)
>>> .withSchema(avroSchema)
>>> .withCompressionCodec(compressionCodecName)
>>> .withPageSize(config.pageSize)
>>> .withRowGroupSize(config.blockSize)
>>> .withDictionaryEncoding(config.enableDictionary)
>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>> .withValidation(config.validating)
>>> .build()
>>> }
>>>
>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>> SinkFunction[GenericRecord] {
>>>   import ParquetSinkWriter._
>>>   override def invoke(value: GenericRecord): Unit = {
>>> println(s"ADDING TO File : $value") // getting this output
>>> writer.write(value) //the output is not written to the file
>>>   }
>>> }
>>>
>>> //main app
>>> object StreamingJob extends App  {
>>>  implicit val env: StreamExecutionEnvironment =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.enableCheckpointing(500)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>> Time.seconds(3), Time.seconds(3)))
>>>   val backend: StateBackend = new
>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>   env.setStateBackend(backend)
>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>>> kafka)*
>>> *stream2.map { r =>*
>>> *println(s"MAPPING $r") //this output keeps repeating in a loop*
>>> *val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>>> *genericReocrd.put("qname", r.qname)*
>>> *genericReocrd.put("rcode", r.rcode)*
>>> *genericReocrd.put("ts", r.ts)*
>>> *genericReocrd*
>>> *  }.addSink(writer) *
>>>
>>> Thanks for your help
>>> Avi
>>>
>>>

-- 
Thanks,
Vipul


Flink REST api for cancel with savepoint on yarn

2018-08-14 Thread vipul singh
Hello,

I have a question about flink 1.5/1.6 REST endpoints. I was trying to see
how the rest endpoints have changed wrt to cancelling with savepoint; it
seems like now to cancel with savepoint one need to use POST api /
jobs/:jobid/savepoints


While trying to run these queries on yarn, it seems like yarn currently
dosent support these POST calls:
https://issues.apache.org/jira/browse/YARN-2031

On digging further and looking at this email
,
it seems like there is a /jobs/:jobid/yarn-cancel api which does something
similar, but that dosent support cancel-with-savepoin currentlyt. Are there
any plans to add this functionality to this API for future? Is there any
workaround for this for now?

Thanks,
Vipul


Flink on kubernetes: taskmanager error

2018-07-27 Thread vipul singh
Hello,

I am trying to run flink on a kubernetes cluster using minikube and
kubectl. I am following this example
, which runs a flink 1.2
cluster ok.

I am interested in running flink 1.5.1, but when I modify the flink
version, I start to see these exceptions in taskmanager-controller logs.
The exceptions are below:

2018-07-27 07:34:22,429 INFO  org.apache.flink.core.fs.FileSystem
> - Hadoop is not in the classpath/dependencies. The
> extended set of supported File Systems via Hadoop is not available.
>
> 2018-07-27 07:34:22,442 INFO
> org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
> create Hadoop Security Module because Hadoop cannot be found in the
> Classpath.
>
> 2018-07-27 07:34:22,460 INFO  org.apache.flink.runtime.security.SecurityUtils
>   - Cannot install HadoopSecurityContext because Hadoop
> cannot be found in the Classpath.
>
> 2018-07-27 07:34:22,622 WARN  org.apache.flink.configuration.Configuration
> - Config uses deprecated configuration key
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
>
> 2018-07-27 07:34:22,626 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
> select the network interface and address to use by connecting to the
> leading JobManager.
>
> 2018-07-27 07:34:22,626 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils-
> TaskManager will try to connect for 1 milliseconds before falling back
> to heuristics
>
> 2018-07-27 07:34:22,629 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Retrieved new target address
> taskmanager-controller-vncdz/172.17.0.7:6123.
>
> 2018-07-27 07:34:23,390 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:23,391 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:23,391 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,392 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,392 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:23,393 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,393 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:24,195 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:24,196 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:24,197 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,198 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,198 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:24,199 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,199 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:25,803 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:25,811 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:25,811 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:25,812 INFO  

Re: Flink 1.6 release note!!

2018-06-11 Thread vipul singh
I think you are looking for this?
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Flink-1-6-features-tc20502.html

1.6 release notes as per current website:
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.6.html

Recently 1.5 was released:
https://flink.apache.org/news/2018/05/25/release-1.5.0.html



On Mon, Jun 11, 2018 at 10:24 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi
>
> can anybody please send the link or ref document for 1.6 release.
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>

-- 
Thanks,
Vipul


Having a backoff while experiencing checkpointing failures

2018-06-07 Thread vipul singh
Hello all,

Are there any recommendations on using a backoff when experiencing
checkpointing failures?
What we have seen is when a checkpoint starts to expire, the next
checkpoint dosent care about the previous failure, and starts soon after.
We experimented with *min_pause_between_checkpoints*, however that seems
only to work for successful checkpoints( the same is discussed on this
thread

)

Are there any recommendations on how to have a backoff or is there
something in works to add a backoff incase of checkpointing failures? This
seems very valuable incase of checkpointing on an external location like
s3, where one can be potentially throttled or gets errors like
TooBusyException from s3(for example like in this jira
)

Please let us know!
Thanks,
Vipul


Exception on running an Elasticpipe flink connector

2018-01-03 Thread vipul singh
Hello,

We are working on a Flink ES connector, sourcing from a kafka stream, and
sinking data into elasticsearch. The code works fine in intellij, but while
running the code on emr(version 5.9, which uses flink 1.3.2) using
flink-yarn-session, we are seeing this exception

Using the parallelism provided by the remote cluster (1). To use
another parallelism, set it at the ./bin/flink client.

Starting execution of program

2018-01-02 23:19:16,217 INFO  org.apache.flink.yarn.YarnClusterClient
 - Starting program in interactive mode



 The program finished with the following exception:

java.lang.NoSuchMethodError:
io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;

at 
org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)

at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)

at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)

at 
org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)

at 
org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)

at 
org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)

at 
org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)

at 
org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)

at 
org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)

at 
org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)

at 
org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)

at 
org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)

On searching online, it seems like this maybe due to netty version
conflicts.
However when we ran a dependency tree on our pom, and we dont see netty
coming from anywhere else but flink:
https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06

Could you please suggest how can we resolve this error,

Thanks,
Vipul


Re: Pending parquet file with Bucking Sink

2017-12-18 Thread vipul singh
Hi Tao,

Is checkpointing enabled in your app? The pending files should be moved to
non-pending files after checkpoint interval.

Please take a look at this link

:
"If checkpointing is not enabled the pending files will never be moved to
the finished state"

Thanks,
Vipul



On Mon, Dec 18, 2017 at 4:30 PM, Tao Xia  wrote:

> Hi All,
>   Do you guys write parquet file using Bucking Sink? I run into an issue
> with all the parquet files are in the pending status.  Any ideas?
>
> processedStream is a DataStream of NDEvent.
>
> Output files are all like this one "_part-0-0.pending"
>
> val parquetSink = new BucketingSink[NDEvent]("/tmp/")
> parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("-MM-dd/HH"))
> parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
> processedStream.addSink(parquetSink)
>
> public class SinkParquetWriter implements Writer {
>
> transient ParquetWriter writer = null;
> String schema = null;
>
> public SinkParquetWriter(String schema) {
> this.writer = writer;
> this.schema = schema;
> }
>
> public void open(FileSystem fileSystem, Path path) throws IOException {
> writer = AvroParquetWriter.builder(path)
> .withSchema(new Schema.Parser().parse(schema))
> .withCompressionCodec(CompressionCodecName.SNAPPY)
> .build();
> }
>
> public long flush() throws IOException {
> return writer.getDataSize();
> }
>
> public long getPos() throws IOException {
> return writer.getDataSize();
> }
>
> public void close() throws IOException {
> writer.close();
> }
>
> public void write(T t) throws IOException {
> writer.write(t);
> }
>
> public Writer duplicate() {
> return new SinkParquetWriter(schema);
> }
> }
>
>
> Thanks,
> Tao
>



-- 
Thanks,
Vipul


Re: save points through REST API not supported ?

2017-12-06 Thread vipul singh
Hi Vishal,

Job cancellations can be done via a REST API:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#cancel-job-with-savepoint

Thanks,
Vipul

On Wed, Dec 6, 2017 at 10:56 AM, Vishal Santoshi 
wrote:

> One can submit jobs, upload jars, kill jobs etc very strange that you
> can’t do a save point ?
>
> Or am I missing something obvious ?
>
> Vishal
>



-- 
Thanks,
Vipul


Re: Questions about checkpoints/savepoints

2017-10-26 Thread vipul singh
As a followup to above, is there a way to get the last checkpoint metadata
location inside *notifyCheckpointComplete*  method? I tried poking around,
but didnt see a way to achieve this. Or incase there is any other way to
save the actual checkpoint metadata location information into a
datastore(dynamodb etc)?

We are looking to save the savepoint/externalized checkpoint metadata
location in some storage space, so that we can pass this information to
flink run command during recovery(thereby removing the possibility of any
read after write consistency arising out of listing file paths etc).

Thanks,
Vipul

On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <neoea...@gmail.com> wrote:

> Thanks Aljoscha for the explanations. I was able to recover from the last
> externalized checkpoint, by using flink run -s  
>
> I am curious, are there any options to save the metadata file name to some
> other place like dynamo etc at the moment? The reason why I am asking is,
> for the end launcher code we are writing, we want to ensure if a flink job
> crashes, we can just start it from last known externalized checkpoint.
> In the present senario, we have to list the contents of the s3 bucket
> which saves the metadata, to see the last metadata before failure, and
> there might a window where
> we might run into read after write consistency of s3. Thoughts?
>
> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> That distinction with externalised checkpoints is a bit of a pitfall and
>> I'm hoping that we can actually get rid of that distinction in the next
>> version or the version after that. With that change, all checkpoints would
>> always be externalised, since it's not really any noticeable overhead.
>>
>> Regarding read-after-write consistency, you should be fine since an the
>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>> the file-path (either from the Flink dashboard or by looking at the S3
>> bucket) you can restore from it.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 24. Oct 2017, at 08:22, vipul singh <neoea...@gmail.com> wrote:
>>
>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>> so much!
>>
>> I have one followup question. Say in above case, I terminate the cluster,
>> and since the metadata is on s3, and not on local storage, does flink avoid
>> read after write consistency of s3? Would it be a valid concern, or we
>> handle that case in externalized checkpoints as well, and dont deal with
>> file system operations while dealing with retrieving externalized
>> checkpoints on s3.
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Did you enable externalized checkpoints? [1]
>>>
>>> Best,
>>> Tony Wei
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>>
>>> 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>:
>>>
>>>> Thanks Aljoscha for the answer above.
>>>>
>>>> I am experimenting with savepoints and checkpoints on my end, so that
>>>> we built fault tolerant application with exactly once semantics.
>>>>
>>>> I have been able to test various scenarios, but have doubts about one
>>>> use case.
>>>>
>>>> My app is running on an emr cluster, and I am trying to test the case
>>>> when a emr cluster is terminated. I have read that
>>>> *state.checkpoints.dir *is responsible for storing metadata
>>>> information, and links to data files in
>>>> *state.backend.fs.checkpointdir.*
>>>>
>>>> For my application I have configured both
>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>>
>>>> Also I have the following in my main app:
>>>>
>>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>>
>>>> val CHECKPOINT_LOCATION = 
>>>> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>>
>>>> val backend:RocksDBStateBackend =
>>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>>
>>>> env.setStateBackend(backend)
>>>> env.getCheckpointConfig.setMinPauseBetweenCheck

Re: Questions about checkpoints/savepoints

2017-10-25 Thread vipul singh
Thanks Aljoscha for the explanations. I was able to recover from the last
externalized checkpoint, by using flink run -s  

I am curious, are there any options to save the metadata file name to some
other place like dynamo etc at the moment? The reason why I am asking is,
for the end launcher code we are writing, we want to ensure if a flink job
crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which
saves the metadata, to see the last metadata before failure, and there
might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> That distinction with externalised checkpoints is a bit of a pitfall and
> I'm hoping that we can actually get rid of that distinction in the next
> version or the version after that. With that change, all checkpoints would
> always be externalised, since it's not really any noticeable overhead.
>
> Regarding read-after-write consistency, you should be fine since an the
> "externalised checkpoint", i.e. the metadata, is only one file. If you know
> the file-path (either from the Flink dashboard or by looking at the S3
> bucket) you can restore from it.
>
> Best,
> Aljoscha
>
>
> On 24. Oct 2017, at 08:22, vipul singh <neoea...@gmail.com> wrote:
>
> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
> and provide an s3 path, it uses externalized checkpoints by default. Thanks
> so much!
>
> I have one followup question. Say in above case, I terminate the cluster,
> and since the metadata is on s3, and not on local storage, does flink avoid
> read after write consistency of s3? Would it be a valid concern, or we
> handle that case in externalized checkpoints as well, and dont deal with
> file system operations while dealing with retrieving externalized
> checkpoints on s3.
>
> Thanks,
> Vipul
>
>
>
> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com> wrote:
>
>> Hi,
>>
>> Did you enable externalized checkpoints? [1]
>>
>> Best,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>
>> 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>:
>>
>>> Thanks Aljoscha for the answer above.
>>>
>>> I am experimenting with savepoints and checkpoints on my end, so that we
>>> built fault tolerant application with exactly once semantics.
>>>
>>> I have been able to test various scenarios, but have doubts about one
>>> use case.
>>>
>>> My app is running on an emr cluster, and I am trying to test the case
>>> when a emr cluster is terminated. I have read that
>>> *state.checkpoints.dir *is responsible for storing metadata
>>> information, and links to data files in
>>> *state.backend.fs.checkpointdir.*
>>>
>>> For my application I have configured both
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>>
>>> Also I have the following in my main app:
>>>
>>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>>
>>> val CHECKPOINT_LOCATION = 
>>> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>>
>>> val backend:RocksDBStateBackend =
>>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>>
>>> env.setStateBackend(backend)
>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>>
>>>
>>> In the application startup logs I can see
>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>>> being loaded. However when the checkpoint happens I dont see any content in
>>> the metadata dir. Is there something I am missing? Please let me know. I am
>>> using flink version 1.3
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink does not rely on file system operations to list contents, all
>>>> necessary file paths are stored in the meta data file, as you guessed. This
>>>> is the reason savepoints also work with file syste

Re: Questions about checkpoints/savepoints

2017-10-24 Thread vipul singh
Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
and provide an s3 path, it uses externalized checkpoints by default. Thanks
so much!

I have one followup question. Say in above case, I terminate the cluster,
and since the metadata is on s3, and not on local storage, does flink avoid
read after write consistency of s3? Would it be a valid concern, or we
handle that case in externalized checkpoints as well, and dont deal with
file system operations while dealing with retrieving externalized
checkpoints on s3.

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi,
>
> Did you enable externalized checkpoints? [1]
>
> Best,
> Tony Wei
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/checkpoints.html#externalized-checkpoints
>
> 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>:
>
>> Thanks Aljoscha for the answer above.
>>
>> I am experimenting with savepoints and checkpoints on my end, so that we
>> built fault tolerant application with exactly once semantics.
>>
>> I have been able to test various scenarios, but have doubts about one use
>> case.
>>
>> My app is running on an emr cluster, and I am trying to test the case
>> when a emr cluster is terminated. I have read that
>> *state.checkpoints.dir *is responsible for storing metadata information,
>> and links to data files in *state.backend.fs.checkpointdir.*
>>
>> For my application I have configured both
>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>>
>> Also I have the following in my main app:
>>
>> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>>
>> val CHECKPOINT_LOCATION = 
>> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>>
>> val backend:RocksDBStateBackend =
>>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>>
>> env.setStateBackend(backend)
>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>>
>>
>> In the application startup logs I can see
>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
>> being loaded. However when the checkpoint happens I dont see any content in
>> the metadata dir. Is there something I am missing? Please let me know. I am
>> using flink version 1.3
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Flink does not rely on file system operations to list contents, all
>>> necessary file paths are stored in the meta data file, as you guessed. This
>>> is the reason savepoints also work with file systems that "only" have
>>> read-after-write consistency.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote:
>>>
>>> Thanks Stefan for the answers above. These are really helpful.
>>>
>>> I have a few followup questions:
>>>
>>>1. I see my savepoints are created in a folder, which has a
>>>_metadata file and another file. Looking at the code
>>>
>>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>>it seems like the metadata file contains tasks states, operator
>>>state and master states
>>>
>>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>>What is the purpose of the other file in the savepoint folder? My guess 
>>> is
>>>it should be a checkpoint file?
>>>2. I am planning to use s3 as my state backend, so want to ensure
>>>that application restarts are not affected by read-after-write 
>>> consistency
>>>of s3( if I use s3 as a savepoint backend). I am curious how flink 
>>> restores
>>>data from the _metadata file, and the other file? Does the _metadata file
>>>contain path to these other files? or would it do a listing on the s3
>>>folder?
>>>
>>>
>>> Please let me know,
>>>
>>> Thanks

Re: Questions about checkpoints/savepoints

2017-10-23 Thread vipul singh
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we
built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use
case.

My app is running on an emr cluster, and I am trying to test the case when
a emr cluster is terminated. I have read that *state.checkpoints.dir *is
responsible for storing metadata information, and links to data files in
*state.backend.fs.checkpointdir.*

For my application I have configured both
*state.backend.fs.checkpointdir* and *state.checkpoints.dir*

Also I have the following in my main app:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION =
s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
  new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)


In the application startup logs I can see *state.backend.fs.checkpointdir*
and *state.checkpoints.dir, *values being loaded. However when the
checkpoint happens I dont see any content in the metadata dir. Is there
something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Flink does not rely on file system operations to list contents, all
> necessary file paths are stored in the meta data file, as you guessed. This
> is the reason savepoints also work with file systems that "only" have
> read-after-write consistency.
>
> Best,
> Aljoscha
>
>
> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote:
>
> Thanks Stefan for the answers above. These are really helpful.
>
> I have a few followup questions:
>
>1. I see my savepoints are created in a folder, which has a _metadata
>file and another file. Looking at the code
>
> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>it seems like the metadata file contains tasks states, operator state
>and master states
>
> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>What is the purpose of the other file in the savepoint folder? My guess is
>it should be a checkpoint file?
>2. I am planning to use s3 as my state backend, so want to ensure that
>application restarts are not affected by read-after-write consistency of
>s3( if I use s3 as a savepoint backend). I am curious how flink restores
>data from the _metadata file, and the other file? Does the _metadata file
>contain path to these other files? or would it do a listing on the s3
>folder?
>
>
> Please let me know,
>
> Thanks,
> Vipul
>
> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I have answered your questions inline:
>>
>>
>>1. It seems to me that checkpoints can be treated as flink internal
>>recovery mechanism, and savepoints act more as user-defined recovery
>>points. Would that be a correct assumption?
>>
>> You could see it that way, but I would describe savepoints more as
>> user-defined *restart* points than *recovery* points. Please take a look at
>> my answers in this thread, because they cover most of your question:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>
>>
>>1. While cancelling an application with -s option, it specifies the
>>savepoint location. Is there a way during application startup to identify
>>the last know savepoint from a folder by itself, and restart from there.
>>Since I am saving my savepoints on s3, I want to avoid issues arising from
>>*ls* command on s3 due to read-after-write consistency of s3.
>>
>> I don’t think that this feature exists, you have to specify the savepoint.
>>
>>
>>1. Suppose my application has a checkpoint at point t1, and say i
>>cancel this application sometime in future before the next available
>>checkpoint( say t1+x). If I start the application without specifying the
>>savepoint, it will start from the last known checkpoint(at

Re: Custom Sink Checkpointing errors

2017-10-22 Thread vipul singh
Thanks Stefan. I found the issue in my application. Everything is working
as excepted now.
Once again thanks for the help and advice.

On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <neoea...@gmail.com> wrote:

> Thanks Stefan for the answers. The serialization is happening during the
> creation of snapshot state. I have added a gist with a larger stacktrace(
> https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
> not using any serializer, in the custom sink.
>
> We have
>
> src.keyBy(m => (m.topic, m.partition))
> .map(message => updateMessage(message, config))
> .addSink(new 
> CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
> .name(FLINK_JOB_ID)
>
> So there should be a 1-1 source and sink mapping, i am assuming.
>
> If possible could you could please give some more pointers to help
> troubleshoot
>
> Thanks,
> Vipul
>
>
> On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> the crash looks unrelated to Flink code from the dump’s trace. Since it
>> happens somewhere in managing a jar file, it might be related to this:
>> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
>> jar gets overwritten while running, e.g. from your IDE?
>>
>> The serialization exception looks like the custom sink is using the same
>> serializer in different threads concurrently. I don’t have the full custom
>> code but this would be my guess. Ensure to duplicate serializers whenever
>> different threads could work on them, e.g. processing vs checkpointing.
>>
>> Best,
>> Stefan
>>
>>
>>
>>
>> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>:
>>
>> Hello all,
>>
>> I am working on a custom sink implementation, but having weird issues
>> with checkpointing.
>>
>> I am using a custom ListState to checkpoint, and it looks like this:
>>
>> private var checkpointMessages: ListState[Bucket] =_
>>
>>
>> My snapshot function looks like:
>>
>> @throws[IOException]
>> def snapshotState(context: FunctionSnapshotContext): Unit = {
>>   checkpointMessages.clear()
>>   for((bucketName, bucket) <- bufferedMessages) {
>>
>> // cloning to avoid any conncurrent modification issues
>> var new_buffer = new ListBuffer[GenericRecord]()
>>
>> bucket.buffer.foreach(f=> new_buffer += f)
>>
>> val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>>
>> if(shouldUpload(bucketName)) uploadFile (bucketName)
>> else checkpointMessages.add(new_bucket)
>>   }}
>>
>> where class bucket is:
>>
>> @SerialVersionUID(1L)
>> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var 
>> timestamp: Long) extends Serializable{
>>   def this(name: String) = {
>> this(name, ListBuffer[GenericRecord](), new Date().getTime)
>>   }
>> }
>>
>>
>> BufferredMessages signature is
>>
>> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>>
>>
>> The basic idea behind this implementation is I maintain multiple buffers,
>> and push messages(org.apache.avro.generic.GenericRecord) during the
>> @invoke section of the sink, upon reaching certain thresholds I archive
>> these on s3.
>>
>> I try to run this both locally in intellij and on a cluster:
>>
>> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
>> error out with the exception below:
>>
>>
>> # A fatal error has been detected by the Java Runtime Environment:
>> #
>> #  SIGSEGV (0xb) at pc=0x00010d46440c, pid=25232,
>> tid=0x3903
>> #
>> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
>> 1.8.0_131-b11)
>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
>> bsd-amd64 compressed oops)
>> # Problematic frame:
>> # V  [libjvm.dylib+0x46440c]
>> #
>> # Core dump written. Default location: /cores/core or core.25232
>> #
>> # An error report file with more information is saved as:
>> # hs_err_pid25232.log
>> #
>> # If you would like to submit a bug report, please visit:
>> #   http://bugreport.java.com/bugreport/crash.jsp
>> # The crash happened outside the Java Virtual Machine in native code.
>> # See problematic frame for where to report the bug.
>> #
>> Disconnected from the target VM, address: '127.0.0.1:60979', tra

Re: Custom Sink Checkpointing errors

2017-10-20 Thread vipul singh
Thanks Stefan for the answers. The serialization is happening during the
creation of snapshot state. I have added a gist with a larger stacktrace(
https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
not using any serializer, in the custom sink.

We have

src.keyBy(m => (m.topic, m.partition))
.map(message => updateMessage(message, config))
.addSink(new 
CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
.name(FLINK_JOB_ID)

So there should be a 1-1 source and sink mapping, i am assuming.

If possible could you could please give some more pointers to help
troubleshoot

Thanks,
Vipul


On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> the crash looks unrelated to Flink code from the dump’s trace. Since it
> happens somewhere in managing a jar file, it might be related to this:
> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
> jar gets overwritten while running, e.g. from your IDE?
>
> The serialization exception looks like the custom sink is using the same
> serializer in different threads concurrently. I don’t have the full custom
> code but this would be my guess. Ensure to duplicate serializers whenever
> different threads could work on them, e.g. processing vs checkpointing.
>
> Best,
> Stefan
>
>
>
>
> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>:
>
> Hello all,
>
> I am working on a custom sink implementation, but having weird issues with
> checkpointing.
>
> I am using a custom ListState to checkpoint, and it looks like this:
>
> private var checkpointMessages: ListState[Bucket] =_
>
>
> My snapshot function looks like:
>
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>   for((bucketName, bucket) <- bufferedMessages) {
>
> // cloning to avoid any conncurrent modification issues
> var new_buffer = new ListBuffer[GenericRecord]()
>
> bucket.buffer.foreach(f=> new_buffer += f)
>
> val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>
> if(shouldUpload(bucketName)) uploadFile (bucketName)
> else checkpointMessages.add(new_bucket)
>   }}
>
> where class bucket is:
>
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var 
> timestamp: Long) extends Serializable{
>   def this(name: String) = {
> this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
>
>
> BufferredMessages signature is
>
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>
>
> The basic idea behind this implementation is I maintain multiple buffers,
> and push messages(org.apache.avro.generic.GenericRecord) during the
> @invoke section of the sink, upon reaching certain thresholds I archive
> these on s3.
>
> I try to run this both locally in intellij and on a cluster:
>
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
> error out with the exception below:
>
>
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00010d46440c, pid=25232,
> tid=0x3903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
> 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
> bsd-amd64 compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979', transport:
> 'socket'
>
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
>
> I managed to collect a core dump: https://gist.github.com/neoeahit/
> 38a02955c1de7501561fba2e593d5f6a.
>
> On a cluster I start to set concurrent serialization issues:
> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
>
> My initial guess is this is happening due to the size of the ListState?
> but i checked the number of records are around ~10k in the buffer. Due to
> the nature of the application, we have to implement this in a custom sink.
>
> Could someone please help me/ guide me to troubleshoot this further.
>
> --
> Thanking in advance,
> Vipul
>
>
>


-- 
Thanks,
Vipul


Custom Sink Checkpointing errors

2017-10-20 Thread vipul singh
Hello all,

I am working on a custom sink implementation, but having weird issues with
checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:

private var checkpointMessages: ListState[Bucket] =_


My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
  checkpointMessages.clear()
  for((bucketName, bucket) <- bufferedMessages) {

// cloning to avoid any conncurrent modification issues
var new_buffer = new ListBuffer[GenericRecord]()

bucket.buffer.foreach(f=> new_buffer += f)

val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

if(shouldUpload(bucketName)) uploadFile (bucketName)
else checkpointMessages.add(new_bucket)
  }}

where class bucket is:

@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord],
var timestamp: Long) extends Serializable{
  def this(name: String) = {
this(name, ListBuffer[GenericRecord](), new Date().getTime)
  }
}


BufferredMessages signature is

private val bufferedMessages = collection.mutable.Map[String, Bucket]()


The basic idea behind this implementation is I maintain multiple buffers,
and push messages(org.apache.avro.generic.GenericRecord) during the @invoke
section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then
error out with the exception below:


# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00010d46440c, pid=25232, tid=0x3903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport:
'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump:
https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a.

On a cluster I start to set concurrent serialization issues:
https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but
i checked the number of records are around ~10k in the buffer. Due to the
nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

-- 
Thanking in advance,
Vipul


Re: Questions about checkpoints/savepoints

2017-10-09 Thread vipul singh
Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:

   1. I see my savepoints are created in a folder, which has a _metadata
   file and another file. Looking at the code
   

   it seems like the metadata file contains tasks states, operator state
   and master states
   
.
   What is the purpose of the other file in the savepoint folder? My guess is
   it should be a checkpoint file?
   2. I am planning to use s3 as my state backend, so want to ensure that
   application restarts are not affected by read-after-write consistency of
   s3( if I use s3 as a savepoint backend). I am curious how flink restores
   data from the _metadata file, and the other file? Does the _metadata file
   contain path to these other files? or would it do a listing on the s3
   folder?


Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter  wrote:

> Hi,
>
> I have answered your questions inline:
>
>
>1. It seems to me that checkpoints can be treated as flink internal
>recovery mechanism, and savepoints act more as user-defined recovery
>points. Would that be a correct assumption?
>
> You could see it that way, but I would describe savepoints more as
> user-defined *restart* points than *recovery* points. Please take a look at
> my answers in this thread, because they cover most of your question:
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>
>
>1. While cancelling an application with -s option, it specifies the
>savepoint location. Is there a way during application startup to identify
>the last know savepoint from a folder by itself, and restart from there.
>Since I am saving my savepoints on s3, I want to avoid issues arising from
>*ls* command on s3 due to read-after-write consistency of s3.
>
> I don’t think that this feature exists, you have to specify the savepoint.
>
>
>1. Suppose my application has a checkpoint at point t1, and say i
>cancel this application sometime in future before the next available
>checkpoint( say t1+x). If I start the application without specifying the
>savepoint, it will start from the last known checkpoint(at t1), which wont
>have the application state saved, since I had cancelled the application.
>Would this is a correct assumption?
>
> If you restart a canceled application it will not consider checkpoints.
> They are only considered in recovery on failure. You need to specify a
> savepoint or externalized checkpoint for restarts to make explicit that you
> intend to restart a job, and not to run a new instance of the job.
>
>
>1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>same as manually saving regular savepoints?
>
> Not the same, because checkpoints and savepoints are different in certain
> aspects, but both methods leave you with something that survives job
> cancelation and can be used to restart from a certain state.
>
> Best,
> Stefan
>
>


-- 
Thanks,
Vipul


Weird error in submitting a flink job to yarn cluster

2017-10-03 Thread vipul singh
Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to
parquet format. I am having some weird issues in deploying this application
to a yarn cluster. I am not 100% sure this falls into a flink related
error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok.
This is the command I use for the deployment:

*Command:* *flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2
-yn 2 -d -c  jar_name.jar*

However as soon as I try to submit a similar job to a already running yarn
cluster, I start to get these
errors(*https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57
*) and
application crashes. I checked the location in /tmp, where I am creating
the file, and there is no file existing there.

*Command:* *flink run -yid application_id -d -c  jar_name.jar *


A bit more about my algorithm, I use a temp array to buffer messages in the
@invoke method, and when specific threshold are reached I create a parquet
file with this buffered data. Once a tmp parquet file is created, I upload
this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
   .withSchema(schema.get)
   .withCompressionCodec(compressionCodecName)
   .withRowGroupSize(blockSize)
   .withPageSize(pageSize)
   .build())
bufferedMessages.foreach { e =>
  writer.get.write(e.payload)
}
writer.get.close()


Please do let me know.

Thanking in advance,
- Vipul


Questions about checkpoints/savepoints

2017-09-25 Thread vipul singh
Hello,

I have some confusion about checkpoints vs savepoints, and how to use them
effectively in my application.

I am working on an application which is relies on flink's fault tolerant
mechanism to ensure exactly once semantics. I have enabled external
checkpointing in my application as below:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

env.setStateBackend(new RocksDBStateBackend(CHECKPOINT_LOCATION))

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

Please correct me incase I am wrong but the above ensures if the
application crashes, it is able to recover from the last know location.
This however wont work if we cancel the application( for new
deployments/restarts).

Reading link 
about
savepoints, hints that it should a good practice to have savepoints at
regular intervals of time(by crons

etc) so that the application can be restarted from a last known location.
This also points to using command line option( -s ) to cancel an
application, so that the application stops after saving a savepoint. Based
on the above understanding I have some questions below:

Questions:

   1. It seems to me that checkpoints can be treated as flink internal
   recovery mechanism, and savepoints act more as user-defined recovery
   points. Would that be a correct assumption?
   2. While cancelling an application with -s option, it specifies the
   savepoint location. Is there a way during application startup to identify
   the last know savepoint from a folder by itself, and restart from there.
   Since I am saving my savepoints on s3, I want to avoid issues arising from
   *ls* command on s3 due to read-after-write consistency of s3.
   3. Suppose my application has a checkpoint at point t1, and say i cancel
   this application sometime in future before the next available checkpoint(
   say t1+x). If I start the application without specifying the savepoint, it
   will start from the last known checkpoint(at t1), which wont have the
   application state saved, since I had cancelled the application. Would this
   is a correct assumption?
   4. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
   same as manually saving regular savepoints?


Please let me know.

Thanks,
Vipul


Re: Issues in recovering state from last crash using custom sink

2017-08-28 Thread vipul singh
Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am
stopping the application between two checkpoints, so there will be messages
in the list state, which should be checkpointed when *snapshot* is called.
I am able to see a checkpoint file on S3( I am saving the checkpoints on s3
using rockstatedb). On restarting the application, I add a debug point here
<https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L49>,
to see if there are any messages in checkpointedMessages, but as shown
below, the list is empty.


​
Do you think there might be an error in the way I am trying to retrieve
messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

* def initializeState(context: FunctionInitializationContext) {*
* // Check is files alreay exist in /tmp*
* // this might be the case the program crashed before these files were
uploaded to s3*
* // We need to recover(upload these files to S3 and clear the directory*
* handlePreviousPendingFiles()*
* checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))*
* import scala.collection.JavaConversions._*
* for (message <- checkpointedMessages.get) {*
* bufferredMessages.add(message)*
* }*
* }* From my understanding in the above code, upon checkpointing, messages
contained in checkpointedMessages are in the snapshot, and on
*initializeState* being called, it will try to recover these messages from
last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint
ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> How are you testing the recovery behaviour? Are you taking a savepoint
> ,then shutting down, and then restarting the Job from the savepoint?
>
> Best,
> Aljoscha
>
> On 28. Aug 2017, at 00:28, vipul singh <neoea...@gmail.com> wrote:
>
> Hi all,
>
> I am working on a flink archiver application. In a gist this application
> tries to reads a bunch of schematized messages from kafka and archives them
> to s3. Due to the nature of the naming of the files, I had to go towards a
> custom sink implementation. As of the current progress the application in
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code
> can be found on link
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My
> issue is on recovery when initializeState is called, it is not able to
> get(recover) the last checkpointed ListState( i.e. checkpointedMessages
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47>
>  is
> 0). I think this might be because of the way I am retrieving the
> checkpointed messages. Could someone please point me to what is wrong? or
> direct me to some examples which do a similar thing( Please note Message
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2>
>  class
> is our own implementation)
>
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul


Issues in recovering state from last crash using custom sink

2017-08-27 Thread vipul singh
Hi all,

I am working on a flink archiver application. In a gist this application
tries to reads a bunch of schematized messages from kafka and archives them
to s3. Due to the nature of the naming of the files, I had to go towards a
custom sink implementation. As of the current progress the application in
general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can
be found on link
. My
issue is on recovery when initializeState is called, it is not able to
get(recover) the last checkpointed ListState( i.e. checkpointedMessages

is
0). I think this might be because of the way I am retrieving the
checkpointed messages. Could someone please point me to what is wrong? or
direct me to some examples which do a similar thing( Please note Message

class
is our own implementation)

Thanks,
Vipul