Re: RocksDB

2020-03-10 Thread Aljoscha Krettek

On 10.03.20 11:36, Timothy Victor wrote:

Can the RocksDB state backend used by Flink be queries from outside, e.g.
via SQL?


That's not possible, but you might be interested in queryable state: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html



Or maybe a better question, is there a RocksDB SinkFunction that exists?n


I'm afraid that doesn't exist, and it probably never will because 
usually RocksDB is not a distributed system so writing to it from a 
parallel stream processing application would most likely not work well.


Best,
Aljoscha


Re: RocksDB

2020-03-10 Thread David Anderson
The State Processor API goes a bit in the direction you asking about, by
making it possible to query savepoints.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


Regards,
David


On Tue, Mar 10, 2020 at 1:05 PM Aljoscha Krettek 
wrote:

> On 10.03.20 11:36, Timothy Victor wrote:
> > Can the RocksDB state backend used by Flink be queries from outside, e.g.
> > via SQL?
>
> That's not possible, but you might be interested in queryable state:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>
> > Or maybe a better question, is there a RocksDB SinkFunction that exists?n
>
> I'm afraid that doesn't exist, and it probably never will because
> usually RocksDB is not a distributed system so writing to it from a
> parallel stream processing application would most likely not work well.
>
> Best,
> Aljoscha
>


Re: RocksDB Statebackend

2016-04-12 Thread Aljoscha Krettek
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism,
number of operators with state and so on? Also, I'm assuming you are using
the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is
slower is that the FsStateBackend basically stores the state in a Java
HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
data in on-disk files and goes to them for every state access (of course
there are caches, but generally it is like this). I'm actually impressed
that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start
a checkpoint until all tasks have confirmed that checkpoint. I have seen
this before and I think it results from back pressure. The problem is that
the checkpoint messages that we sent through the topology are sitting at
the sources because they are also back pressured by the slow processing of
normal records. You should be able to see the actual checkpointing times
(both synchronous and asynchronous) in the log files of the task managers,
they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a
day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
wrote:

> Hi everyone,
>
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
>
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be larger.)
>
> These are my observations:
>
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
>
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
>
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
>
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
>
> Cheers,
>
> Konstantin
>


Re: RocksDB Statebackend

2016-04-12 Thread Stephan Ewen
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply
does not compact for a long time, thus having a lot of stale data in the
snapshot.

That would be especially the case, if you have a lot of changing values for
the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm going to try and respond to each point:
>
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are using
> the partitioned state abstraction, i.e. getState(), correct?
>
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
> data in on-disk files and goes to them for every state access (of course
> there are caches, but generally it is like this). I'm actually impressed
> that it is still this fast in comparison.
>
> 3. see 1. (I think for now)
>
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I have
> seen this before and I think it results from back pressure. The problem is
> that the checkpoint messages that we sent through the topology are sitting
> at the sources because they are also back pressured by the slow processing
> of normal records. You should be able to see the actual checkpointing times
> (both synchronous and asynchronous) in the log files of the task managers,
> they should be very much lower.
>
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
>
> Cheers,
> Aljoscha
>
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> my experience with RocksDBStatebackend have left me a little bit
>> confused. Maybe you guys can confirm that my epxierence is the expected
>> behaviour ;):
>>
>> I have run a "performancetest" twice, once with FsStateBackend and once
>> RocksDBStatebackend in comparison. In this particular test the state
>> saved is generally not large (in a production scenario it will be larger.)
>>
>> These are my observations:
>>
>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>> to <<1MB with the FSStatebackend.
>>
>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>
>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>> gets smaller for very large state. Can you confirm?
>>
>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>> RocksDB during the test and <1 second for FsStatebackend. Does the
>> reported time correspond to the sync. + asynchronous part of the
>> checkpointing in case of RocksDB? Is there any way to tell how long the
>> synchronous part takes?
>>
>> Form these first observations RocksDB does seem to bring a large
>> overhead for state < 1GB, I guess? Is this expected?
>>
>> Cheers,
>>
>> Konstantin
>>
>


Re: RocksDB Statebackend

2016-04-12 Thread Maxim
Is it possible to add an option to store the state in the Java HashMap and
write its content to RocksDB when checkpointing? For "hot" keys that are
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would
also be big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:

> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
> simply does not compact for a long time, thus having a lot of stale data in
> the snapshot.
>
> That would be especially the case, if you have a lot of changing values
> for the same set of keys.
>
> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I'm going to try and respond to each point:
>>
>> 1. This seems strange, could you give some background on parallelism,
>> number of operators with state and so on? Also, I'm assuming you are using
>> the partitioned state abstraction, i.e. getState(), correct?
>>
>> 2. your observations are pretty much correct. The reason why RocksDB is
>> slower is that the FsStateBackend basically stores the state in a Java
>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>> data in on-disk files and goes to them for every state access (of course
>> there are caches, but generally it is like this). I'm actually impressed
>> that it is still this fast in comparison.
>>
>> 3. see 1. (I think for now)
>>
>> 4. The checkpointing time is the time from the JobManager deciding to
>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>> seen this before and I think it results from back pressure. The problem is
>> that the checkpoint messages that we sent through the topology are sitting
>> at the sources because they are also back pressured by the slow processing
>> of normal records. You should be able to see the actual checkpointing times
>> (both synchronous and asynchronous) in the log files of the task managers,
>> they should be very much lower.
>>
>> I can go into details, I'm just writing this quickly before calling it a
>> day. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>>
>>> Hi everyone,
>>>
>>> my experience with RocksDBStatebackend have left me a little bit
>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>> behaviour ;):
>>>
>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>> RocksDBStatebackend in comparison. In this particular test the state
>>> saved is generally not large (in a production scenario it will be
>>> larger.)
>>>
>>> These are my observations:
>>>
>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>> to <<1MB with the FSStatebackend.
>>>
>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>
>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>>> gets smaller for very large state. Can you confirm?
>>>
>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>> reported time correspond to the sync. + asynchronous part of the
>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>> synchronous part takes?
>>>
>>> Form these first observations RocksDB does seem to bring a large
>>> overhead for state < 1GB, I guess? Is this expected?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>
>


Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory
abstraction of Flink so that we can make sure that we stay within memory
bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim  wrote:

> Is it possible to add an option to store the state in the Java HashMap and
> write its content to RocksDB when checkpointing? For "hot" keys that are
> updated very frequently such optimization would help with performance.
>
> I know that you are also working on incremental checkpoints which would
> also be big win for jobs with a large number of keys.
>
> Thanks,
>
> Maxim.
>
> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:
>
>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>> simply does not compact for a long time, thus having a lot of stale data in
>> the snapshot.
>>
>> That would be especially the case, if you have a lot of changing values
>> for the same set of keys.
>>
>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I'm going to try and respond to each point:
>>>
>>> 1. This seems strange, could you give some background on parallelism,
>>> number of operators with state and so on? Also, I'm assuming you are using
>>> the partitioned state abstraction, i.e. getState(), correct?
>>>
>>> 2. your observations are pretty much correct. The reason why RocksDB is
>>> slower is that the FsStateBackend basically stores the state in a Java
>>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>>> data in on-disk files and goes to them for every state access (of course
>>> there are caches, but generally it is like this). I'm actually impressed
>>> that it is still this fast in comparison.
>>>
>>> 3. see 1. (I think for now)
>>>
>>> 4. The checkpointing time is the time from the JobManager deciding to
>>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>>> seen this before and I think it results from back pressure. The problem is
>>> that the checkpoint messages that we sent through the topology are sitting
>>> at the sources because they are also back pressured by the slow processing
>>> of normal records. You should be able to see the actual checkpointing times
>>> (both synchronous and asynchronous) in the log files of the task managers,
>>> they should be very much lower.
>>>
>>> I can go into details, I'm just writing this quickly before calling it a
>>> day. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>>> konstantin.kn...@tngtech.com> wrote:
>>>
 Hi everyone,

 my experience with RocksDBStatebackend have left me a little bit
 confused. Maybe you guys can confirm that my epxierence is the expected
 behaviour ;):

 I have run a "performancetest" twice, once with FsStateBackend and once
 RocksDBStatebackend in comparison. In this particular test the state
 saved is generally not large (in a production scenario it will be
 larger.)

 These are my observations:

 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
 to <<1MB with the FSStatebackend.

 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
 FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
 gets smaller for very large state. Can you confirm?

 4. Checkpointing Times as reported in the Dashboard were 26secs for
 RocksDB during the test and <1 second for FsStatebackend. Does the
 reported time correspond to the sync. + asynchronous part of the
 checkpointing in case of RocksDB? Is there any way to tell how long the
 synchronous part takes?

 Form these first observations RocksDB does seem to bring a large
 overhead for state < 1GB, I guess? Is this expected?

 Cheers,

 Konstantin

>>>
>>
>


Re: RocksDB Statebackend

2016-04-13 Thread Shannon Carey
This is something that my team and I have discussed building, so it's great to 
know that it's already on the radar. If we beat you to it, I'll definitely try 
to make it a contribution.

Shannon

From: Aljoscha Krettek mailto:aljos...@apache.org>>
Date: Wednesday, April 13, 2016 at 1:46 PM
To: mailto:user@flink.apache.org>>
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory 
abstraction of Flink so that we can make sure that we stay within memory bounds 
and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim 
mailto:mfat...@gmail.com>> wrote:
Is it possible to add an option to store the state in the Java HashMap and 
write its content to RocksDB when checkpointing? For "hot" keys that are 
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be 
big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen 
mailto:se...@apache.org>> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply 
does not compact for a long time, thus having a lot of stale data in the 
snapshot.

That would be especially the case, if you have a lot of changing values for the 
same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of 
operators with state and so on? Also, I'm assuming you are using the 
partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower 
is that the FsStateBackend basically stores the state in a Java HashMap and 
writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk 
files and goes to them for every state access (of course there are caches, but 
generally it is like this). I'm actually impressed that it is still this fast 
in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a 
checkpoint until all tasks have confirmed that checkpoint. I have seen this 
before and I think it results from back pressure. The problem is that the 
checkpoint messages that we sent through the topology are sitting at the 
sources because they are also back pressured by the slow processing of normal 
records. You should be able to see the actual checkpointing times (both 
synchronous and asynchronous) in the log files of the task managers, they 
should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. 
:-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
mailto:konstantin.kn...@tngtech.com>> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin




Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
That's interesting to hear. If you want we can also collaborate on that
one. Using the Flink managed memory for that purpose would require some
changes to lower layers of Flink.

On Wed, 13 Apr 2016 at 13:11 Shannon Carey  wrote:

> This is something that my team and I have discussed building, so it's
> great to know that it's already on the radar. If we beat you to it, I'll
> definitely try to make it a contribution.
>
> Shannon
>
> From: Aljoscha Krettek 
> Date: Wednesday, April 13, 2016 at 1:46 PM
> To: 
> Subject: Re: RocksDB Statebackend
>
> Hi Maxim,
> yes the plan is to have a cache of hot values that uses the managed memory
> abstraction of Flink so that we can make sure that we stay within memory
> bounds and don't run into OOM exceptions.
>
> On Tue, 12 Apr 2016 at 23:37 Maxim  wrote:
>
>> Is it possible to add an option to store the state in the Java HashMap
>> and write its content to RocksDB when checkpointing? For "hot" keys that
>> are updated very frequently such optimization would help with performance.
>>
>> I know that you are also working on incremental checkpoints which would
>> also be big win for jobs with a large number of keys.
>>
>> Thanks,
>>
>> Maxim.
>>
>> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:
>>
>>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>>> simply does not compact for a long time, thus having a lot of stale data in
>>> the snapshot.
>>>
>>> That would be especially the case, if you have a lot of changing values
>>> for the same set of keys.
>>>
>>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi,
>>>> I'm going to try and respond to each point:
>>>>
>>>> 1. This seems strange, could you give some background on parallelism,
>>>> number of operators with state and so on? Also, I'm assuming you are using
>>>> the partitioned state abstraction, i.e. getState(), correct?
>>>>
>>>> 2. your observations are pretty much correct. The reason why RocksDB is
>>>> slower is that the FsStateBackend basically stores the state in a Java
>>>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>>>> data in on-disk files and goes to them for every state access (of course
>>>> there are caches, but generally it is like this). I'm actually impressed
>>>> that it is still this fast in comparison.
>>>>
>>>> 3. see 1. (I think for now)
>>>>
>>>> 4. The checkpointing time is the time from the JobManager deciding to
>>>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>>>> seen this before and I think it results from back pressure. The problem is
>>>> that the checkpoint messages that we sent through the topology are sitting
>>>> at the sources because they are also back pressured by the slow processing
>>>> of normal records. You should be able to see the actual checkpointing times
>>>> (both synchronous and asynchronous) in the log files of the task managers,
>>>> they should be very much lower.
>>>>
>>>> I can go into details, I'm just writing this quickly before calling it
>>>> a day. :-)
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>>>> konstantin.kn...@tngtech.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> my experience with RocksDBStatebackend have left me a little bit
>>>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>>>> behaviour ;):
>>>>>
>>>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>>>> RocksDBStatebackend in comparison. In this particular test the state
>>>>> saved is generally not large (in a production scenario it will be
>>>>> larger.)
>>>>>
>>>>> These are my observations:
>>>>>
>>>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>>>> to <<1MB with the FSStatebackend.
>>>>>
>>>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>>>
>>>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the
>>>>> difference
>>>>> gets smaller for very large state. Can you confirm?
>>>>>
>>>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>>>> reported time correspond to the sync. + asynchronous part of the
>>>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>>>> synchronous part takes?
>>>>>
>>>>> Form these first observations RocksDB does seem to bring a large
>>>>> overhead for state < 1GB, I guess? Is this expected?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>
>>>
>>


Re: RocksDB Statebackend

2016-04-13 Thread Konstantin Knauf
Hi Aljoscha,

thanks for your answers. I am currently not in the office, so I can not
run any further analysis until Monday. Just some quick answers to your
questions.

We are using the partitioned state abstraction, most of the state should
correspond to buffered events in windows. Parallelism is 9. In terms of
stateful operators we basically just have a KafkaSource, a custom
stateful trigger as well as a RollingSink. Overall in this test scenario
the state is very limited (see size of state using FsStateBackend).

I will get back to you once, I have done some more experiments, which
will be in the course of next week.

Cheers,

Konstantin


On 12.04.2016 18:41, Aljoscha Krettek wrote:
> Hi,
> I'm going to try and respond to each point:
> 
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are
> using the partitioned state abstraction, i.e. getState(), correct?
> 
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB
> stores data in on-disk files and goes to them for every state access (of
> course there are caches, but generally it is like this). I'm actually
> impressed that it is still this fast in comparison.
> 
> 3. see 1. (I think for now)
> 
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I
> have seen this before and I think it results from back pressure. The
> problem is that the checkpoint messages that we sent through the
> topology are sitting at the sources because they are also back pressured
> by the slow processing of normal records. You should be able to see the
> actual checkpointing times (both synchronous and asynchronous) in the
> log files of the task managers, they should be very much lower.
> 
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
> 
> Cheers,
> Aljoscha
> 
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf
> mailto:konstantin.kn...@tngtech.com>> wrote:
> 
> Hi everyone,
> 
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
> 
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be
> larger.)
> 
> These are my observations:
> 
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
> 
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
> 
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
> 
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
> 
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
> 
> Cheers,
> 
> Konstantin
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: RocksDB segfaults

2017-03-22 Thread Stephan Ewen
Hi!

It looks like you are running the RocksDB state backend 1.1 (is still an
old version packaged into your JAR file?)

This line indicates that: org.apache.flink.contrib.streaming.state.
RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in
1.2 any more)

Can you try and run 1.2 and see if that still occurs? In general, I cannot
vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.

Stephan



On Wed, Mar 22, 2017 at 3:13 PM, Florian König 
wrote:

> Hi,
>
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by
> something in RocksDB. What is the preferred way to report them? All I got
> at the moment are two hs_err_pid12345.log files. They are over 4000 lines
> long each. Is there anything significant that I should extract to help you
> guys and/or put into a JIRA ticket?
>
> The first thing that came to my mind was the stack traces (see below).
> Anything else?
>
> Thanks
> Florian
>
> 
>
> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,
> free space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc
> [0x7fec92588780+0x4c]
> J 27241 C2 org.apache.flink.contrib.streaming.state.
> RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @
> 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Object;+1
> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.
> serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/
> QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114
> bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$
> MetricDumpSerializer.serialize(Ljava/util/Map;
> Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/
> flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
> (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> J 47531 C2 org.apache.flink.runtime.metrics.dump.
> MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @
> 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/
> PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x7fec91e3ae6c
> [0x7fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104
> bytes) @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @
> 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182
> bytes) @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
>
> --
>
> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,
> free space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
> code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string std::char_traits, std::allocator >::basic_string(std::string
> const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions
> const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]
> Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @
> 0x7f16cb79aa36 [0x7f16cb79a980+0xb6]
> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @
> 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @
> 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
> J 50547 C2 org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/
> apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @
> 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
> J 52232 C2 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z
> (650 bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
> (25 bytes) @ 0x7f16cbdd2624 [0x7f16cbdd25c0+0x64]
> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(Lorg/apache/flink/streaming/api/operators/
> OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @
> 0x7f16cc8aed5c [0x7f16cc8add40+0x101c]
> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V
> (42 bytes) @ 0x7f16cbdc21d0 [0x7f16cbdc20c0+0x110]

Re: RocksDB segfaults

2017-03-22 Thread Florian König
Hi Stephen,

you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
Sorry, my bad.

That leaves us with the first trace of a segfault for which I can guarantee 
that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
problem. It has happened twice so far, but I can’t see any pattern. Is there 
anything in the stack trace that could point us to a probable cause?

Florian

> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> 
> Hi!
> 
> It looks like you are running the RocksDB state backend 1.1 (is still an old 
> version packaged into your JAR file?)
> 
> This line indicates that: 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>  (the method does not exist in 1.2 any more)
> 
> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
> 
> Stephan
> 
> 
> 
> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
> wrote:
> Hi,
> 
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
> in RocksDB. What is the preferred way to report them? All I got at the moment 
> are two hs_err_pid12345.log files. They are over 4000 lines long each. Is 
> there anything significant that I should extract to help you guys and/or put 
> into a JIRA ticket?
> 
> The first thing that came to my mind was the stack traces (see below). 
> Anything else?
> 
> Thanks
> Florian
> 
> 
> 
> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  free 
> space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
> [0x7fec92588780+0x4c]
> J 27241 C2 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> J 38483 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
>  (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> J 38522 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
>  (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> J 47531 C2 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
>  (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> J 5815 C2 
> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
>  (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) 
> @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> J 6628 C2 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
>  (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) 
> @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
> 
> --
> 
> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  free 
> space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string, 
> std::allocator >::basic_string(std::string const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
> const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
> Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
> [0x7f16cb79a980+0xb6]
> J 49809 C1 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
>  (416 bytes) @ 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
> J 51766 C2 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
>  (40 bytes) @ 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
> J 50547 C2 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
>  (206 bytes) @ 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
> J 52232 C2 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z 
> (650 bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
> J 52419 C2 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe

Re: RocksDB segfaults

2017-03-22 Thread Stefan Richter
Hi,

for the first checkpoint, from the stacktrace I assume that the backend is not 
accessed as part of processing an element, but by another thread. Is that 
correct? RocksDB requires accessing threads to hold the task’s checkpointing 
lock, otherwise they might call methods on an instance that is already 
disposed. However, this should only happen when the task was already about to 
shutdown anyways. Is that a plausible explanation for your observed behaviour? 
I can also not rule out that segfaults can happen inside RocksDB or due to the 
JNI bridge.

Best,
Stefan

> Am 22.03.2017 um 16:53 schrieb Florian König :
> 
> Hi Stephen,
> 
> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
> Sorry, my bad.
> 
> That leaves us with the first trace of a segfault for which I can guarantee 
> that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
> problem. It has happened twice so far, but I can’t see any pattern. Is there 
> anything in the stack trace that could point us to a probable cause?
> 
> Florian
> 
>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
>> 
>> Hi!
>> 
>> It looks like you are running the RocksDB state backend 1.1 (is still an old 
>> version packaged into your JAR file?)
>> 
>> This line indicates that: 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>>  (the method does not exist in 1.2 any more)
>> 
>> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
>> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>> 
>> Stephan
>> 
>> 
>> 
>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
>> wrote:
>> Hi,
>> 
>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
>> in RocksDB. What is the preferred way to report them? All I got at the 
>> moment are two hs_err_pid12345.log files. They are over 4000 lines long 
>> each. Is there anything significant that I should extract to help you guys 
>> and/or put into a JIRA ticket?
>> 
>> The first thing that came to my mind was the stack traces (see below). 
>> Anything else?
>> 
>> Thanks
>> Florian
>> 
>> 
>> 
>> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  
>> free space=1019k
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
>> [0x7fec92588780+0x4c]
>> J 27241 C2 
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>> J 38483 C2 
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
>>  (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
>> J 38522 C2 
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
>>  (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
>> J 47531 C2 
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
>>  (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
>> J 5815 C2 
>> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
>>  (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) 
>> @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
>> J 6628 C2 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
>>  (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) 
>> @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
>> v  ~StubRoutines::call_stub
>> 
>> --
>> 
>> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  
>> free space=1019k
>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
>> code)
>> C  [libstdc++.so.6+0xc026b]  std::basic_string, 
>> std::allocator >::basic_string(std::string const&)+0xb
>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
>> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
>> const&, rocksdb::BackupEngine**)+0x3a
>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
>> Java_org_rocksdb_BackupEngine_open+0x25
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
>> [0x7f16cb79a980+0xb6]
>> J 49809 C1 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapsh

Re: RocksDB segfaults

2017-03-23 Thread Robert Metzger
Florian, can you post the log of the Taskmanager where the segfault
happened ?

On Wed, Mar 22, 2017 at 6:19 PM, Stefan Richter  wrote:

> Hi,
>
> for the first checkpoint, from the stacktrace I assume that the backend is
> not accessed as part of processing an element, but by another thread. Is
> that correct? RocksDB requires accessing threads to hold the task’s
> checkpointing lock, otherwise they might call methods on an instance that
> is already disposed. However, this should only happen when the task was
> already about to shutdown anyways. Is that a plausible explanation for your
> observed behaviour? I can also not rule out that segfaults can happen
> inside RocksDB or due to the JNI bridge.
>
> Best,
> Stefan
>
> > Am 22.03.2017 um 16:53 schrieb Florian König  >:
> >
> > Hi Stephen,
> >
> > you are right, the second stack trace is indeed from a run of Flink
> 1.1.4. Sorry, my bad.
> >
> > That leaves us with the first trace of a segfault for which I can
> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot
> reproduce the problem. It has happened twice so far, but I can’t see any
> pattern. Is there anything in the stack trace that could point us to a
> probable cause?
> >
> > Florian
> >
> >> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> >>
> >> Hi!
> >>
> >> It looks like you are running the RocksDB state backend 1.1 (is still
> an old version packaged into your JAR file?)
> >>
> >> This line indicates that: org.apache.flink.contrib.streaming.state.
> RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist
> in 1.2 any more)
> >>
> >> Can you try and run 1.2 and see if that still occurs? In general, I
> cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was
> stable.
> >>
> >> Stephan
> >>
> >>
> >>
> >> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <
> florian.koe...@micardo.com> wrote:
> >> Hi,
> >>
> >> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by
> something in RocksDB. What is the preferred way to report them? All I got
> at the moment are two hs_err_pid12345.log files. They are over 4000 lines
> long each. Is there anything significant that I should extract to help you
> guys and/or put into a JIRA ticket?
> >>
> >> The first thing that came to my mind was the stack traces (see below).
> Anything else?
> >>
> >> Thanks
> >> Florian
> >>
> >> 
> >>
> >> Stack: [0x7fec04341000,0x7fec04442000],
> sp=0x7fec0443ff48,  free space=1019k
> >> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @
> 0x7fec925887cc [0x7fec92588780+0x4c]
> >> J 27241 C2 org.apache.flink.contrib.streaming.state.
> RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @
> 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> >> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Long;+7
> >> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Object;+1
> >> J 38483 C2 org.apache.flink.runtime.metrics.dump.
> MetricDumpSerialization.serializeGauge(Ljava/io/
> DataOutput;Lorg/apache/flink/runtime/metrics/dump/
> QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114
> bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> >> J 38522 C2 org.apache.flink.runtime.metrics.dump.
> MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;
> Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/
> flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
> (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> >> J 47531 C2 org.apache.flink.runtime.metrics.dump.
> MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @
> 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> >> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/
> PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x7fec91e3ae6c
> [0x7fec91e3adc0+0xac]
> >> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104
> bytes) @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> >> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @
> 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> >> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182
> bytes) @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> >> v  ~StubRoutines::call_stub
> >>
> >> --
> >>
> >> Stack: [0x7f167a5b7000,0x7f167a6b8000],
> sp=0x7f167a6b5f40,  free space=1019k
> >> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code,
> C=native code)
> >> C  [libstdc++.so.6+0xc026b]  std::basic_string std::char_traits, std::allocator >::basic_string(std::string
> const&)+0xb
> >> C  [librocksdbjni8426686507832168508.so+0x2f14ca]
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions
> const&, rocksdb::BackupEngine**)+0x3a
> >> C  [librocksdbjni8426686507832168508.so+0x180

Re: RocksDB segfaults

2017-03-24 Thread Florian König
Hi,

@Robert: I have uploaded all the log files that I could get my hands on to 
https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
tried to remove all unrelated messages logged by the job itself. In 
flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
the last half hour before the segfault.

@Stefan: Your theory could be the key. In the stack trace I see a call to 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
that results in a call to 
com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. 
Here’s a slimmed-down snippet of the relevant code:

class TransitionProcessor extends RichFlatMapFunction {

transient ValueState headSeenState;

public void open(final Configuration parameters) throws Exception {
headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
IdSet.class);

getRuntimeContext()
.getMetricGroup()
.gauge("head-seen", new Gauge() {
public Long getValue() {
try {
return 
headSeenState.value().count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
}
});
}
…
}

FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
reference to that state via the RuntimeContext of the RichFunction passed as 
‚this‘ in the above code.

Further along in the stack trace I see that headSeenState.value() results in a 
call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and 
then to org.rocksdb.RocksDB.get(J[BIJ).

It looks like part of the metrics system asynchronously reads the value of the 
gauge and needs RocksDB for that. Is it possible that this thread does not hold 
the checkpointing lock you were talking about?

Best regards
Florian


> Am 22.03.2017 um 18:19 schrieb Stefan Richter :
> 
> Hi,
> 
> for the first checkpoint, from the stacktrace I assume that the backend is 
> not accessed as part of processing an element, but by another thread. Is that 
> correct? RocksDB requires accessing threads to hold the task’s checkpointing 
> lock, otherwise they might call methods on an instance that is already 
> disposed. However, this should only happen when the task was already about to 
> shutdown anyways. Is that a plausible explanation for your observed 
> behaviour? I can also not rule out that segfaults can happen inside RocksDB 
> or due to the JNI bridge.
> 
> Best,
> Stefan
> 
>> Am 22.03.2017 um 16:53 schrieb Florian König :
>> 
>> Hi Stephen,
>> 
>> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
>> Sorry, my bad.
>> 
>> That leaves us with the first trace of a segfault for which I can guarantee 
>> that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
>> problem. It has happened twice so far, but I can’t see any pattern. Is there 
>> anything in the stack trace that could point us to a probable cause?
>> 
>> Florian
>> 
>>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
>>> 
>>> Hi!
>>> 
>>> It looks like you are running the RocksDB state backend 1.1 (is still an 
>>> old version packaged into your JAR file?)
>>> 
>>> This line indicates that: 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>>>  (the method does not exist in 1.2 any more)
>>> 
>>> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
>>> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>> 
>>> Stephan
>>> 
>>> 
>>> 
>>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
>>> wrote:
>>> Hi,
>>> 
>>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
>>> in RocksDB. What is the preferred way to report them? All I got at the 
>>> moment are two hs_err_pid12345.log files. They are over 4000 lines long 
>>> each. Is there anything significant that I should extract to help you guys 
>>> and/or put into a JIRA ticket?
>>> 
>>> The first thing that came to my mind was the stack traces (see below). 
>>> Anything else?
>>> 
>>> Thanks
>>> Florian
>>> 
>>> 
>>> 
>>> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  
>>> free space=1019k
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
>>> [0x7fec92588780+0x4c]
>>> J 27241 C2 
>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>>>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
>>> j  com.micardo.backend.TransitionProcessor$2.getVa

Re: RocksDB segfaults

2017-03-24 Thread Stephan Ewen
The code will not work properly, sorry.

The value returned by the state is whatever is stored under the key for
which the function was called the last time.
In addition, the unsynchronized access is most likely causing the RocksDB
fault.

TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to
be used across threads.



On Fri, Mar 24, 2017 at 8:28 AM, Florian König 
wrote:

> Hi,
>
> @Robert: I have uploaded all the log files that I could get my hands on to
> https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0.
> I tried to remove all unrelated messages logged by the job itself. In
> flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages
> and the last half hour before the segfault.
>
> @Stefan: Your theory could be the key. In the stack trace I see a call to
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge
> that results in a call to 
> com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long
> later. Here’s a slimmed-down snippet of the relevant code:
>
> class TransitionProcessor extends RichFlatMapFunction Reaction> {
>
> transient ValueState headSeenState;
>
> public void open(final Configuration parameters) throws Exception {
> headSeenState = FlinkUtil.getStateHandle(this,
> "head-seen", IdSet.class);
>
> getRuntimeContext()
> .getMetricGroup()
> .gauge("head-seen", new Gauge() {
> public Long getValue() {
> try {
> return
> headSeenState.value().count();
> } catch (IOException e) {
> e.printStackTrace();
> return 0L;
> }
> }
> });
> }
> …
> }
>
> FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires
> a reference to that state via the RuntimeContext of the RichFunction passed
> as ‚this‘ in the above code.
>
> Further along in the stack trace I see that headSeenState.value() results
> in a call to 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()
> and then to org.rocksdb.RocksDB.get(J[BIJ).
>
> It looks like part of the metrics system asynchronously reads the value of
> the gauge and needs RocksDB for that. Is it possible that this thread does
> not hold the checkpointing lock you were talking about?
>
> Best regards
> Florian
>
>
> > Am 22.03.2017 um 18:19 schrieb Stefan Richter <
> s.rich...@data-artisans.com>:
> >
> > Hi,
> >
> > for the first checkpoint, from the stacktrace I assume that the backend
> is not accessed as part of processing an element, but by another thread. Is
> that correct? RocksDB requires accessing threads to hold the task’s
> checkpointing lock, otherwise they might call methods on an instance that
> is already disposed. However, this should only happen when the task was
> already about to shutdown anyways. Is that a plausible explanation for your
> observed behaviour? I can also not rule out that segfaults can happen
> inside RocksDB or due to the JNI bridge.
> >
> > Best,
> > Stefan
> >
> >> Am 22.03.2017 um 16:53 schrieb Florian König <
> florian.koe...@micardo.com>:
> >>
> >> Hi Stephen,
> >>
> >> you are right, the second stack trace is indeed from a run of Flink
> 1.1.4. Sorry, my bad.
> >>
> >> That leaves us with the first trace of a segfault for which I can
> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot
> reproduce the problem. It has happened twice so far, but I can’t see any
> pattern. Is there anything in the stack trace that could point us to a
> probable cause?
> >>
> >> Florian
> >>
> >>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> >>>
> >>> Hi!
> >>>
> >>> It looks like you are running the RocksDB state backend 1.1 (is still
> an old version packaged into your JAR file?)
> >>>
> >>> This line indicates that: org.apache.flink.contrib.streaming.state.
> RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist
> in 1.2 any more)
> >>>
> >>> Can you try and run 1.2 and see if that still occurs? In general, I
> cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was
> stable.
> >>>
> >>> Stephan
> >>>
> >>>
> >>>
> >>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <
> florian.koe...@micardo.com> wrote:
> >>> Hi,
> >>>
> >>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by
> something in RocksDB. What is the preferred way to report them? All I got
> at the moment are two hs_err_pid12345.log files. They are over 4000 lines
> long each. Is there anything significant that I should extract to help you
> guys and/or put into a JIRA ticket?
> >>>
> >>> The first thing that came to my mind was the

Re: RocksDB segfaults

2017-03-28 Thread Florian König
Thank you Stephan for spotting the problem. In hindsight it’s obvious that this 
can never work. I’ll figure something out :)

> Am 24.03.2017 um 10:28 schrieb Stephan Ewen :
> 
> The code will not work properly, sorry.
> 
> The value returned by the state is whatever is stored under the key for which 
> the function was called the last time.
> In addition, the unsynchronized access is most likely causing the RocksDB 
> fault.
> 
> TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to be 
> used across threads.
> 
> 
> 
> On Fri, Mar 24, 2017 at 8:28 AM, Florian König  
> wrote:
> Hi,
> 
> @Robert: I have uploaded all the log files that I could get my hands on to 
> https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
> tried to remove all unrelated messages logged by the job itself. In 
> flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
> the last half hour before the segfault.
> 
> @Stefan: Your theory could be the key. In the stack trace I see a call to 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
> that results in a call to 
> com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long 
> later. Here’s a slimmed-down snippet of the relevant code:
> 
> class TransitionProcessor extends RichFlatMapFunction {
> 
> transient ValueState headSeenState;
> 
> public void open(final Configuration parameters) throws Exception {
> headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
> IdSet.class);
> 
> getRuntimeContext()
> .getMetricGroup()
> .gauge("head-seen", new Gauge() {
> public Long getValue() {
> try {
> return 
> headSeenState.value().count();
> } catch (IOException e) {
> e.printStackTrace();
> return 0L;
> }
> }
> });
> }
> …
> }
> 
> FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
> reference to that state via the RuntimeContext of the RichFunction passed as 
> ‚this‘ in the above code.
> 
> Further along in the stack trace I see that headSeenState.value() results in 
> a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() 
> and then to org.rocksdb.RocksDB.get(J[BIJ).
> 
> It looks like part of the metrics system asynchronously reads the value of 
> the gauge and needs RocksDB for that. Is it possible that this thread does 
> not hold the checkpointing lock you were talking about?
> 
> Best regards
> Florian
> 
> 
> > Am 22.03.2017 um 18:19 schrieb Stefan Richter :
> >
> > Hi,
> >
> > for the first checkpoint, from the stacktrace I assume that the backend is 
> > not accessed as part of processing an element, but by another thread. Is 
> > that correct? RocksDB requires accessing threads to hold the task’s 
> > checkpointing lock, otherwise they might call methods on an instance that 
> > is already disposed. However, this should only happen when the task was 
> > already about to shutdown anyways. Is that a plausible explanation for your 
> > observed behaviour? I can also not rule out that segfaults can happen 
> > inside RocksDB or due to the JNI bridge.
> >
> > Best,
> > Stefan
> >
> >> Am 22.03.2017 um 16:53 schrieb Florian König :
> >>
> >> Hi Stephen,
> >>
> >> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
> >> Sorry, my bad.
> >>
> >> That leaves us with the first trace of a segfault for which I can 
> >> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot 
> >> reproduce the problem. It has happened twice so far, but I can’t see any 
> >> pattern. Is there anything in the stack trace that could point us to a 
> >> probable cause?
> >>
> >> Florian
> >>
> >>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> >>>
> >>> Hi!
> >>>
> >>> It looks like you are running the RocksDB state backend 1.1 (is still an 
> >>> old version packaged into your JAR file?)
> >>>
> >>> This line indicates that: 
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
> >>>  (the method does not exist in 1.2 any more)
> >>>
> >>> Can you try and run 1.2 and see if that still occurs? In general, I 
> >>> cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it 
> >>> was stable.
> >>>
> >>> Stephan
> >>>
> >>>
> >>>
> >>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König 
> >>>  wrote:
> >>> Hi,
> >>>
> >>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by 
> >>> something in RocksDB. What is the preferred way to report them? All I got 
> >>> at the moment are two hs_err_pid12

Re: Rocksdb implementation

2020-05-18 Thread Jaswin Shah
/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor pgStateDescriptor = new 
MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
   if(pgState.contains(searchKey)) {
   generateResultMessage(cartMessage,pgState.get(searchKey));
   pgState.remove(searchKey);
   } else {
   cartState.put(searchKey,cartMessage);
   }
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PGMessage pgMessage, Context context, 
Collector collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private ResultMessage generateResultMessage(CartMessage cartMessage, 
PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
payment = pay;
break;
}
}
  

Re: Rocksdb implementation

2020-05-18 Thread Arvid Heise
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation
detail of Flink. I'd also discourage to write to Kafka directly without
using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer
(2 days?) [1] for all unmatched records and on triggering of the timer,
output the record through a side output [2], where you do your batch logic.
Then you don't need a separate batch job to clean that up. If you actually
want to output to Kafka for some other application, you just need to stream
the side output to a KafkaProducer.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
wrote:

> /**
>  * Alipay.com Inc.
>  * Copyright (c) 2004-2020 All Rights Reserved.
>  */
> package com.paytm.reconsys.functions.processfunctions;
>
> import com.paytm.reconsys.Constants;
> import com.paytm.reconsys.configs.ConfigurationsManager;
> import com.paytm.reconsys.enums.DescripancyTypeEnum;
> import com.paytm.reconsys.exceptions.MissingConfigurationsException;
> import com.paytm.reconsys.messages.ResultMessage;
> import com.paytm.reconsys.messages.cart.CartMessage;
> import com.paytm.reconsys.messages.cart.Payment;
> import com.paytm.reconsys.messages.pg.PGMessage;
> import org.apache.commons.lang3.StringUtils;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.errors.AuthorizationException;
> import org.apache.kafka.common.errors.OutOfOrderSequenceException;
> import org.apache.kafka.common.errors.ProducerFencedException;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configuration;
> import java.util.Properties;
>
> /**
>  * CoProcessFuntion to process cart and pg messages connected using connect 
> operator.
>  * @author jaswin.shah
>  * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
> jaswin.shah Exp $$
>  */
> public class CartPGCoprocessFunction extends 
> KeyedCoProcessFunction {
>
> /**
>  * Map state for cart messages, orderId+mid is key and cartMessage is 
> value.
>  */
> private MapState cartState = null;
>
> /**
>  * Map state for pg messages, orderId+mid is key and pgMessage is value.
>  */
> private MapState pgState = null;
>
> /**
>  * Intializations for cart and pg mapStates
>  *
>  * @param config
>  */
> @Override
> public void open(Configuration config) {
> MapStateDescriptor cartStateDescriptor = new 
> MapStateDescriptor<> (
> "cartData",
> TypeInformation.of(String.class),
> TypeInformation.of(CartMessage.class)
> );
> cartState = getRuntimeContext().getMapState(cartStateDescriptor);
>
> MapStateDescriptor pgStateDescriptor = new 
> MapStateDescriptor<>(
> "pgData",
> TypeInformation.of(String.class),
> TypeInformation.of(PGMessage.class)
> );
> pgState = getRuntimeContext().getMapState(pgStateDescriptor);
> }
>
> /**
>  * 1. Get orderId+mid from cartMessage and check in PGMapState if an 
> entry is present.
>  * 2. If present, match, checkDescripancy, process and delete entry from 
> pgMapState.
>  * 3. If not present, add orderId+mid as key and cart object as value in 
> cartMapState.
>  * @param cartMessage
>  * @param context
>  * @param collector
>  * @throws Exception
>  */
> @Override
> public void processElement1(CartMessage cartMessage, Context context, 
> Collector collector) throws Exception {
> String searchKey = cartMessage.createJoinStringCondition();
>if(pgState.contains(searchKey)) {
>generateResultMessage(cartMessage,pgState.get(searchKey));
>pgState.remove(searchKey);
>} else {
>cartState.put(searchKey,cartMessage);
>}
> }
>
> /**
>  * 1. Get orderId+mid from pgMessage and check in cartMapState if an 
> entry is present.
>  * 2. If present, match, checkDescripancy, process and delete entry from 
> cartMapState.
>  * 3. If not present, add orderId+mid as key and cart object as value in 
> pgMapState.
>  * @param pgMessage
>  * @param context
>  * @param collector
>  * @throws Exception
>  */
>   

Re: Rocksdb implementation

2020-05-18 Thread Yun Tang
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. 
Apart from Arvid's solution, I think queryable state [1] might also help you. I 
think you just want to know the left entries in both of map state after several 
days and query the state should make the meet, please refer to the official doc 
and this example [2] to know more details.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

From: Arvid Heise 
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of 
Flink. I'd also discourage to write to Kafka directly without using our Kafka 
Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 
days?) [1] for all unmatched records and on triggering of the timer, output the 
record through a side output [2], where you do your batch logic. Then you don't 
need a separate batch job to clean that up. If you actually want to output to 
Kafka for some other application, you just need to stream the side output to a 
KafkaProducer.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor pgStateDescriptor = new 
MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public 

Re: Rocksdb implementation

2020-05-19 Thread Jaswin Shah
++

From: Yun Tang 
Sent: 18 May 2020 23:47
To: Arvid Heise ; Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. 
Apart from Arvid's solution, I think queryable state [1] might also help you. I 
think you just want to know the left entries in both of map state after several 
days and query the state should make the meet, please refer to the official doc 
and this example [2] to know more details.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

From: Arvid Heise 
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of 
Flink. I'd also discourage to write to Kafka directly without using our Kafka 
Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 
days?) [1] for all unmatched records and on triggering of the timer, output the 
record through a side output [2], where you do your batch logic. Then you don't 
need a separate batch job to clean that up. If you actually want to output to 
Kafka for some other application, you just need to stream the side output to a 
KafkaProducer.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor pgStateDescriptor = new 
MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart 

Re: Rocksdb implementation

2020-05-19 Thread Congxian Qiu
Hi

Flink will store state in StateBackend, there exist two StateBackends:
HeapStateBackend - which will store state in heap, and RocksDBStateBackend
-- which will store state in RocksDB.

You can set RocksDB with the following ways:[1]
1. add `env.setStateBackend(...);` in your code
2. add configuration `state.backend: rocksdb` in `flink-conf.yaml`

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#configuring-a-state-backend
Best,
Congxian


Jaswin Shah  于2020年5月19日周二 下午3:59写道:

> ++
> --
> *From:* Yun Tang 
> *Sent:* 18 May 2020 23:47
> *To:* Arvid Heise ; Jaswin Shah <
> jaswin.s...@outlook.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin
>
> As Arvid suggested, it's not encouraged to query the internal RocksDB
> directly. Apart from Arvid's solution, I think queryable state [1] might
> also help you. I think you just want to know the left entries in both of
> map state after several days and query the state should make the meet,
> please refer to the official doc and this example [2] to know more details.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> [2]
> https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122
>
> Best
> Yun Tang
> --
> *From:* Arvid Heise 
> *Sent:* Monday, May 18, 2020 23:40
> *To:* Jaswin Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin,
>
> I'd discourage using rocksdb directly. It's more of an implementation
> detail of Flink. I'd also discourage to write to Kafka directly without
> using our Kafka Sink, as you will receive duplicates upon recovery.
>
> If you run the KeyedCoProcessFunction continuously anyways, I'd add a
> timer (2 days?) [1] for all unmatched records and on triggering of the
> timer, output the record through a side output [2], where you do your batch
> logic. Then you don't need a separate batch job to clean that up. If you
> actually want to output to Kafka for some other application, you just need
> to stream the side output to a KafkaProducer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
> wrote:
>
> /**
>  * Alipay.com Inc.
>  * Copyright (c) 2004-2020 All Rights Reserved.
>  */
> package com.paytm.reconsys.functions.processfunctions;
>
> import com.paytm.reconsys.Constants;
> import com.paytm.reconsys.configs.ConfigurationsManager;
> import com.paytm.reconsys.enums.DescripancyTypeEnum;
> import com.paytm.reconsys.exceptions.MissingConfigurationsException;
> import com.paytm.reconsys.messages.ResultMessage;
> import com.paytm.reconsys.messages.cart.CartMessage;
> import com.paytm.reconsys.messages.cart.Payment;
> import com.paytm.reconsys.messages.pg.PGMessage;
> import org.apache.commons.lang3.StringUtils;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.errors.AuthorizationException;
> import org.apache.kafka.common.errors.OutOfOrderSequenceException;
> import org.apache.kafka.common.errors.ProducerFencedException;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configuration;
> import java.util.Properties;
>
> /**
>  * CoProcessFuntion to process cart and pg messages connected using connect 
> operator.
>  * @author jaswin.shah
>  * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
> jaswin.shah Exp $$
>  */
> public class CartPGCoprocessFunction extends 
> KeyedCoProcessFunction {
>
> /**
>  * Map state for cart messages, orderId+mid is key and cartMessage is 
> value.
>  */
> private MapState cartState = null;
>
> /**
>  * Map state for pg messages, orderId+mid is key and pgMessage is value.
>

Re: Rocksdb implementation

2020-05-19 Thread Jaswin Shah
Thanks yun and Arvid.
Just a question, is it possible to have a batch execution inside the same 
streaming job. You meant to say I should collect the missing messages from both 
streams in sideoutput on timer expiry. So, I will execute a batch job on side 
output as sideput will be shared with the same streaming job that I have. 
Basically, I need that missing message infos outside.

From: Jaswin Shah 
Sent: 19 May 2020 13:29
To: Yun Tang ; Arvid Heise ; 
isha.sing...@paytm.com ; ankit.sing...@paytm.com 

Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

++

From: Yun Tang 
Sent: 18 May 2020 23:47
To: Arvid Heise ; Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. 
Apart from Arvid's solution, I think queryable state [1] might also help you. I 
think you just want to know the left entries in both of map state after several 
days and query the state should make the meet, please refer to the official doc 
and this example [2] to know more details.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

From: Arvid Heise 
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of 
Flink. I'd also discourage to write to Kafka directly without using our Kafka 
Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 
days?) [1] for all unmatched records and on triggering of the timer, output the 
record through a side output [2], where you do your batch logic. Then you don't 
need a separate batch job to clean that up. If you actually want to output to 
Kafka for some other application, you just need to stream the side output to a 
KafkaProducer.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
 

Re: Rocksdb implementation

2020-05-19 Thread Arvid Heise
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can
perform the same query on a windowed stream. So if you would execute the
batchy part every day, you can just create a tumble window of 24h and then
perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then
run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah  wrote:

> Thanks yun and Arvid.
> Just a question, is it possible to have a batch execution inside the same
> streaming job. You meant to say I should collect the missing messages from
> both streams in sideoutput on timer expiry. So, I will execute a batch job
> on side output as sideput will be shared with the same streaming job that I
> have. Basically, I need that missing message infos outside.
> --
> *From:* Jaswin Shah 
> *Sent:* 19 May 2020 13:29
> *To:* Yun Tang ; Arvid Heise ;
> isha.sing...@paytm.com ; ankit.sing...@paytm.com <
> ankit.sing...@paytm.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> ++
> --
> *From:* Yun Tang 
> *Sent:* 18 May 2020 23:47
> *To:* Arvid Heise ; Jaswin Shah <
> jaswin.s...@outlook.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin
>
> As Arvid suggested, it's not encouraged to query the internal RocksDB
> directly. Apart from Arvid's solution, I think queryable state [1] might
> also help you. I think you just want to know the left entries in both of
> map state after several days and query the state should make the meet,
> please refer to the official doc and this example [2] to know more details.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> [2]
> https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122
>
> Best
> Yun Tang
> ----------
> *From:* Arvid Heise 
> *Sent:* Monday, May 18, 2020 23:40
> *To:* Jaswin Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin,
>
> I'd discourage using rocksdb directly. It's more of an implementation
> detail of Flink. I'd also discourage to write to Kafka directly without
> using our Kafka Sink, as you will receive duplicates upon recovery.
>
> If you run the KeyedCoProcessFunction continuously anyways, I'd add a
> timer (2 days?) [1] for all unmatched records and on triggering of the
> timer, output the record through a side output [2], where you do your batch
> logic. Then you don't need a separate batch job to clean that up. If you
> actually want to output to Kafka for some other application, you just need
> to stream the side output to a KafkaProducer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
> wrote:
>
> /**
>  * Alipay.com Inc.
>  * Copyright (c) 2004-2020 All Rights Reserved.
>  */
> package com.paytm.reconsys.functions.processfunctions;
>
> import com.paytm.reconsys.Constants;
> import com.paytm.reconsys.configs.ConfigurationsManager;
> import com.paytm.reconsys.enums.DescripancyTypeEnum;
> import com.paytm.reconsys.exceptions.MissingConfigurationsException;
> import com.paytm.reconsys.messages.ResultMessage;
> import com.paytm.reconsys.messages.cart.CartMessage;
> import com.paytm.reconsys.messages.cart.Payment;
> import com.paytm.reconsys.messages.pg.PGMessage;
> import org.apache.commons.lang3.StringUtils;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.errors.AuthorizationException;
> import org.apache.kafka.common.errors.OutOfOrderSequenceException;
> import org.apache.kafka.common.errors.ProducerFencedException;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configu

Re: Rocksdb implementation

2020-05-19 Thread Jaswin Shah
If I create such large tumbling window, that data will stay in memory for large 
time until the window is not triggered, right?  So,won't there be possibility 
of data loss, or flink would recover in case of any outage.

From: Arvid Heise 
Sent: 20 May 2020 00:10
To: Jaswin Shah 
Cc: Yun Tang ; isha.sing...@paytm.com 
; ankit.sing...@paytm.com ; 
user@flink.apache.org 
Subject: Re: Rocksdb implementation

Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform 
the same query on a windowed stream. So if you would execute the batchy part 
every day, you can just create a tumble window of 24h and then perform your 
batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run 
the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:
Thanks yun and Arvid.
Just a question, is it possible to have a batch execution inside the same 
streaming job. You meant to say I should collect the missing messages from both 
streams in sideoutput on timer expiry. So, I will execute a batch job on side 
output as sideput will be shared with the same streaming job that I have. 
Basically, I need that missing message infos outside.

From: Jaswin Shah mailto:jaswin.s...@outlook.com>>
Sent: 19 May 2020 13:29
To: Yun Tang mailto:myas...@live.com>>; Arvid Heise 
mailto:ar...@ververica.com>>; 
isha.sing...@paytm.com<mailto:isha.sing...@paytm.com> 
mailto:isha.sing...@paytm.com>>; 
ankit.sing...@paytm.com<mailto:ankit.sing...@paytm.com> 
mailto:ankit.sing...@paytm.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

++

From: Yun Tang mailto:myas...@live.com>>
Sent: 18 May 2020 23:47
To: Arvid Heise mailto:ar...@ververica.com>>; Jaswin Shah 
mailto:jaswin.s...@outlook.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. 
Apart from Arvid's solution, I think queryable state [1] might also help you. I 
think you just want to know the left entries in both of map state after several 
days and query the state should make the meet, please refer to the official doc 
and this example [2] to know more details.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

From: Arvid Heise mailto:ar...@ververica.com>>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah mailto:jaswin.s...@outlook.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of 
Flink. I'd also discourage to write to Kafka directly without using our Kafka 
Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 
days?) [1] for all unmatched records and on triggering of the timer, output the 
record through a side output [2], where you do your batch logic. Then you don't 
need a separate batch job to clean that up. If you actually want to output to 
Kafka for some other application, you just need to stream the side output to a 
KafkaProducer.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org

Re: Rocksdb implementation

2020-05-19 Thread Arvid Heise
If you enabled checkpointing (which is strongly recommended) [1], no data
is lost.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html

On Tue, May 19, 2020 at 8:59 PM Jaswin Shah  wrote:

> If I create such large tumbling window, that data will stay in memory for
> large time until the window is not triggered, right?  So,won't there be
> possibility of data loss, or flink would recover in case of any outage.
> --
> *From:* Arvid Heise 
> *Sent:* 20 May 2020 00:10
> *To:* Jaswin Shah 
> *Cc:* Yun Tang ; isha.sing...@paytm.com <
> isha.sing...@paytm.com>; ankit.sing...@paytm.com ;
> user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin,
>
> you cannot run a DataSet program inside a DataStream. However, you can
> perform the same query on a windowed stream. So if you would execute the
> batchy part every day, you can just create a tumble window of 24h and then
> perform your batchy analysis on that time window.
>
> Alternatively, you can dump the data into Kafka or a file system and then
> run the batchy part as a separate program.
>
> On Tue, May 19, 2020 at 1:36 PM Jaswin Shah 
> wrote:
>
> Thanks yun and Arvid.
> Just a question, is it possible to have a batch execution inside the same
> streaming job. You meant to say I should collect the missing messages from
> both streams in sideoutput on timer expiry. So, I will execute a batch job
> on side output as sideput will be shared with the same streaming job that I
> have. Basically, I need that missing message infos outside.
> --
> *From:* Jaswin Shah 
> *Sent:* 19 May 2020 13:29
> *To:* Yun Tang ; Arvid Heise ;
> isha.sing...@paytm.com ; ankit.sing...@paytm.com <
> ankit.sing...@paytm.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> ++
> --
> *From:* Yun Tang 
> *Sent:* 18 May 2020 23:47
> *To:* Arvid Heise ; Jaswin Shah <
> jaswin.s...@outlook.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin
>
> As Arvid suggested, it's not encouraged to query the internal RocksDB
> directly. Apart from Arvid's solution, I think queryable state [1] might
> also help you. I think you just want to know the left entries in both of
> map state after several days and query the state should make the meet,
> please refer to the official doc and this example [2] to know more details.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> [2]
> https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122
>
> Best
> Yun Tang
> --
> *From:* Arvid Heise 
> *Sent:* Monday, May 18, 2020 23:40
> *To:* Jaswin Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin,
>
> I'd discourage using rocksdb directly. It's more of an implementation
> detail of Flink. I'd also discourage to write to Kafka directly without
> using our Kafka Sink, as you will receive duplicates upon recovery.
>
> If you run the KeyedCoProcessFunction continuously anyways, I'd add a
> timer (2 days?) [1] for all unmatched records and on triggering of the
> timer, output the record through a side output [2], where you do your batch
> logic. Then you don't need a separate batch job to clean that up. If you
> actually want to output to Kafka for some other application, you just need
> to stream the side output to a KafkaProducer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Mon, May 18, 2020 at 10:30 AM Jaswin Shah 
> wrote:
>
> /**
>  * Alipay.com Inc.
>  * Copyright (c) 2004-2020 All Rights Reserved.
>  */
> package com.paytm.reconsys.functions.processfunctions;
>
> import com.paytm.reconsys.Constants;
> import com.paytm.reconsys.configs.ConfigurationsManager;
> import com.paytm.reconsys.enums.DescripancyTypeEnum;
> import com.paytm.reconsys.exceptions.MissingConfigurationsException;
> import com.paytm.reconsys.messages.ResultMessage;
> import com.paytm.reconsys.messages.cart.CartMessage;
> import com.paytm.reconsys.messages.cart.Payment;
> import com.paytm.reconsys.messages.pg.PGMessage;
> import org.apache.commons.lang3.StringUtils;
> import org.a

Re: Rocksdb implementation

2020-05-19 Thread Jaswin Shah
Okay, so on checkpointing window's data would also be persisted.

From: Arvid Heise 
Sent: 20 May 2020 01:05
To: Jaswin Shah 
Cc: Yun Tang ; isha.sing...@paytm.com 
; ankit.sing...@paytm.com ; 
user@flink.apache.org 
Subject: Re: Rocksdb implementation

If you enabled checkpointing (which is strongly recommended) [1], no data is 
lost.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html

On Tue, May 19, 2020 at 8:59 PM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:
If I create such large tumbling window, that data will stay in memory for large 
time until the window is not triggered, right?  So,won't there be possibility 
of data loss, or flink would recover in case of any outage.

From: Arvid Heise mailto:ar...@ververica.com>>
Sent: 20 May 2020 00:10
To: Jaswin Shah mailto:jaswin.s...@outlook.com>>
Cc: Yun Tang mailto:myas...@live.com>>; 
isha.sing...@paytm.com<mailto:isha.sing...@paytm.com> 
mailto:isha.sing...@paytm.com>>; 
ankit.sing...@paytm.com<mailto:ankit.sing...@paytm.com> 
mailto:ankit.sing...@paytm.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform 
the same query on a windowed stream. So if you would execute the batchy part 
every day, you can just create a tumble window of 24h and then perform your 
batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run 
the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:
Thanks yun and Arvid.
Just a question, is it possible to have a batch execution inside the same 
streaming job. You meant to say I should collect the missing messages from both 
streams in sideoutput on timer expiry. So, I will execute a batch job on side 
output as sideput will be shared with the same streaming job that I have. 
Basically, I need that missing message infos outside.

From: Jaswin Shah mailto:jaswin.s...@outlook.com>>
Sent: 19 May 2020 13:29
To: Yun Tang mailto:myas...@live.com>>; Arvid Heise 
mailto:ar...@ververica.com>>; 
isha.sing...@paytm.com<mailto:isha.sing...@paytm.com> 
mailto:isha.sing...@paytm.com>>; 
ankit.sing...@paytm.com<mailto:ankit.sing...@paytm.com> 
mailto:ankit.sing...@paytm.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

++

From: Yun Tang mailto:myas...@live.com>>
Sent: 18 May 2020 23:47
To: Arvid Heise mailto:ar...@ververica.com>>; Jaswin Shah 
mailto:jaswin.s...@outlook.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. 
Apart from Arvid's solution, I think queryable state [1] might also help you. I 
think you just want to know the left entries in both of map state after several 
days and query the state should make the meet, please refer to the official doc 
and this example [2] to know more details.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

From: Arvid Heise mailto:ar...@ververica.com>>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah mailto:jaswin.s...@outlook.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Rocksdb implementation

Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of 
Flink. I'd also discourage to write to Kafka directly without using our Kafka 
Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 
days?) [1] for all unmatched records and on triggering of the timer, output the 
record through a side output [2], where you do your batch logic. Then you don't 
need a separate batch job to clean that up. If you actually want to output to 
Kafka for some other application, you just need to stream the side output to a 
KafkaProducer.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
[2] 
https://ci.apache.org/projects/fli

Re: Rocksdb Metrics

2018-09-25 Thread Stefan Richter
Hi,

this feature is tracked here https://issues.apache.org/jira/browse/FLINK-10423 


Best,
Stefan

> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
> 
> Flink provides a rich number of metrics. However, I didn't find any metrics 
> for rocksdb state backend not in metrics doc nor in JMX Mbean. 
> 
> Is there are any metrics for the rocksdb backend that Flink exposes?



Re: Rocksdb Metrics

2018-09-26 Thread Yun Tang
Hi Sayat

Before this future is on, you could also find some metrics information, such as 
hit/miss count, file status from RocksDB itself. By default, RocksDB will dump 
its stats to its information LOG file every 10 minutes (you could call 
DBOptions.setStatsDumpPeriodSec to reduce the time interval), and you could 
find the information LOG file under rocksDB state-backend's db folder.

Best
Yun

From: Stefan Richter 
Sent: Wednesday, September 26, 2018 0:56
To: Sayat Satybaldiyev
Cc: user@flink.apache.org
Subject: Re: Rocksdb Metrics

Hi,

this feature is tracked here https://issues.apache.org/jira/browse/FLINK-10423

Best,
Stefan

Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev 
mailto:saya...@gmail.com>>:

Flink provides a rich number of metrics. However, I didn't find any metrics for 
rocksdb state backend not in metrics doc nor in JMX Mbean.

Is there are any metrics for the rocksdb backend that Flink exposes?



Re: Rocksdb Metrics

2018-09-26 Thread Sayat Satybaldiyev
Thank you for this information. @Yun is there an easy way to expose a
number of records in rockdsdb?

On Wed, Sep 26, 2018 at 9:47 AM Yun Tang  wrote:

> Hi Sayat
>
> Before this future is on, you could also find some metrics information,
> such as hit/miss count, file status from RocksDB itself. By default,
> RocksDB will dump its stats to its information LOG file every 10 minutes
> (you could call DBOptions.setStatsDumpPeriodSec to reduce the time
> interval), and you could find the information LOG file under rocksDB
> state-backend's db folder.
>
> Best
> Yun
> --
> *From:* Stefan Richter 
> *Sent:* Wednesday, September 26, 2018 0:56
> *To:* Sayat Satybaldiyev
> *Cc:* user@flink.apache.org
> *Subject:* Re: Rocksdb Metrics
>
> Hi,
>
> this feature is tracked here
> https://issues.apache.org/jira/browse/FLINK-10423
>
> Best,
> Stefan
>
> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
>
> Flink provides a rich number of metrics. However, I didn't find any
> metrics for rocksdb state backend not in metrics doc nor in JMX Mbean.
>
> Is there are any metrics for the rocksdb backend that Flink exposes?
>
>
>


Re: Rocksdb Metrics

2018-09-26 Thread Sayat Satybaldiyev
actually, once I wrote my question I've realized that I can do it with
custom metrics and getting easily the size of the state map.

On Wed, Sep 26, 2018 at 11:57 AM Sayat Satybaldiyev 
wrote:

> Thank you for this information. @Yun is there an easy way to expose a
> number of records in rockdsdb?
>
> On Wed, Sep 26, 2018 at 9:47 AM Yun Tang  wrote:
>
>> Hi Sayat
>>
>> Before this future is on, you could also find some metrics information,
>> such as hit/miss count, file status from RocksDB itself. By default,
>> RocksDB will dump its stats to its information LOG file every 10 minutes
>> (you could call DBOptions.setStatsDumpPeriodSec to reduce the time
>> interval), and you could find the information LOG file under rocksDB
>> state-backend's db folder.
>>
>> Best
>> Yun
>> --
>> *From:* Stefan Richter 
>> *Sent:* Wednesday, September 26, 2018 0:56
>> *To:* Sayat Satybaldiyev
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Rocksdb Metrics
>>
>> Hi,
>>
>> this feature is tracked here
>> https://issues.apache.org/jira/browse/FLINK-10423
>>
>> Best,
>> Stefan
>>
>> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
>>
>> Flink provides a rich number of metrics. However, I didn't find any
>> metrics for rocksdb state backend not in metrics doc nor in JMX Mbean.
>>
>> Is there are any metrics for the rocksdb backend that Flink exposes?
>>
>>
>>


Re: rocksdb without checkpointing

2017-02-15 Thread vinay patil
Hi Abhishek,

You can disable checkpointing by not commenting env.enableCheckpointing

What do you mean by "We are trying to do application level checkpointing"

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 12:42 AM, abhishekrs [via Apache Flink User Mailing
List archive.]  wrote:

> Is it possible to set state backend as RocksDB without asking it to
> checkpoint?
>
> We are trying to do application level checkpointing (since it gives us
> better flexibility to upgrade our flink pipeline and also restore state in
> a application specific upgrade friendly way). So we don’t really need
> rocksDB to do any checkpointing. Moreover, we also observed that there is
> 20s stall every hour that seems to correlate with rocksDB wanting to
> checkpoint.
>
> Will the following work (effectively disable checkpointing)?
>
> new RocksDBStateBackend("file:///dev/null")
>
>
> Or is there a better way?
>
> -Abhishek-
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/rocksdb-without-checkpointing-tp11645.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11650.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: rocksdb without checkpointing

2017-02-15 Thread Abhishek Singh
Sorry, that was a red herring. Checkpointing was not getting triggered
because we never enabled it.

Our application is inherently restartable because we can use our own output
to rebuild state. All that is working fine for us - including restart
semantics - without having to worry about upgrading flink topology. Once we
have something in production, will be happy to share more details in flink
forums. We are very pleased with flink so far. Some paradigms are messy
(scale of select for e.g), but we are very pleased overall !!


On Wed, Feb 15, 2017 at 7:54 PM vinay patil  wrote:

> Hi Abhishek,
>
> You can disable checkpointing by not commenting env.enableCheckpointing
>
> What do you mean by "We are trying to do application level checkpointing"
>
> Regards,
> Vinay Patil
>
> On Thu, Feb 16, 2017 at 12:42 AM, abhishekrs [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=11650&i=0>> wrote:
>
> Is it possible to set state backend as RocksDB without asking it to
> checkpoint?
>
> We are trying to do application level checkpointing (since it gives us
> better flexibility to upgrade our flink pipeline and also restore state in
> a application specific upgrade friendly way). So we don’t really need
> rocksDB to do any checkpointing. Moreover, we also observed that there is
> 20s stall every hour that seems to correlate with rocksDB wanting to
> checkpoint.
>
> Will the following work (effectively disable checkpointing)?
>
> new RocksDBStateBackend("file:///dev/null")
>
>
> Or is there a better way?
>
> -Abhishek-
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645.html
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden
> email] <http:///user/SendEmail.jtp?type=node&node=11650&i=1>
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
> --
> View this message in context: Re: rocksdb without checkpointing
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11650.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>


Re: rocksdb without checkpointing

2017-02-15 Thread vinay patil
Good to hear that.

On which machine you are running your Flink Job, also what are the
configurations you have used for RocksDB

I am currently running on C3.4xlarge with predefined option set to
FLASH_SSD_OPTIMIZED

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 10:31 AM, abhishekrs [via Apache Flink User Mailing
List archive.]  wrote:

> Sorry, that was a red herring. Checkpointing was not getting triggered
> because we never enabled it.
>
> Our application is inherently restartable because we can use our own
> output to rebuild state. All that is working fine for us - including
> restart semantics - without having to worry about upgrading flink topology.
> Once we have something in production, will be happy to share more details
> in flink forums. We are very pleased with flink so far. Some paradigms are
> messy (scale of select for e.g), but we are very pleased overall !!
>
>
> On Wed, Feb 15, 2017 at 7:54 PM vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=11653&i=0>> wrote:
>
>> Hi Abhishek,
>>
>> You can disable checkpointing by not commenting env.enableCheckpointing
>>
>> What do you mean by "We are trying to do application level checkpointing"
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Feb 16, 2017 at 12:42 AM, abhishekrs [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11650&i=0>> wrote:
>>
>> Is it possible to set state backend as RocksDB without asking it to
>> checkpoint?
>>
>> We are trying to do application level checkpointing (since it gives us
>> better flexibility to upgrade our flink pipeline and also restore state in
>> a application specific upgrade friendly way). So we don’t really need
>> rocksDB to do any checkpointing. Moreover, we also observed that there is
>> 20s stall every hour that seems to correlate with rocksDB wanting to
>> checkpoint.
>>
>> Will the following work (effectively disable checkpointing)?
>>
>> new RocksDBStateBackend("file:///dev/null")
>>
>>
>> Or is there a better way?
>>
>> -Abhishek-
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/rocksdb-without-checkpointing-tp11645.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=11650&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>>
>>
>> --
>> View this message in context: Re: rocksdb without checkpointing
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11650.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/rocksdb-without-checkpointing-tp11645p11653.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/rocksdb-without-checkpointing-tp11645p11655.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Rocksdb Serialization issue

2020-03-04 Thread Arvid Heise
Hi David,

the obvious reason is that your state stored an enum value that is not
present anymore. It tries to deserialize the 512. entry in your enum that
is not available.

However, since it's highly unlikely that you actually have that many enum
values in the same enum class, we are actually looking at a corrupt stream,
which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If
it's application, it may be that state is incompatible and would need to be
migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin 
wrote:

> Hello,
>
> I have this Exception in my datastream app and I can't find the root cause.
> I consume data from Kafka and it fails when I try to get a value from my
> MapState in RocksDB.
> It was working in previous release of my app but I can't find the cause of
> this error.
>
> java.lang.ArrayIndexOutOfBoundsException: 512
> at
> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
> at
> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
> at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> ..
>
> Flink version: 1.9.2
>
>
>


Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Hello Arvid,

After some investigations with the help of my colleague we finally found
the root cause.
In order to improve the init of the state, I've created some threads to
parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State
Processor API.
Here after an abstract of the code:
















*ExecutorService executorService =
Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
bucketFile : xxx) {executorService.submit(() -> {
try {readBucketFct(XXX); // Update the state with the bucket
content...} catch (Exception e) {   }
});}executorService.shutdown();boolean terminated =
executorService.awaitTermination(initStateTimeoutSeconds,
TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
throw new SinkException("Init state failed...") ;}*

After some tests: if I use one thead in my executorService it works. But
with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor
API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :

> Hi David,
>
> the obvious reason is that your state stored an enum value that is not
> present anymore. It tries to deserialize the 512. entry in your enum that
> is not available.
>
> However, since it's highly unlikely that you actually have that many enum
> values in the same enum class, we are actually looking at a corrupt stream,
> which is hard to fix. Could you describe which state you have?
>
> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
> it's application, it may be that state is incompatible and would need to be
> migrated.
>
> Did you restart from checkpoint or savepoint?
>
> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
> wrote:
>
>> Hello,
>>
>> I have this Exception in my datastream app and I can't find the root
>> cause.
>> I consume data from Kafka and it fails when I try to get a value from my
>> MapState in RocksDB.
>> It was working in previous release of my app but I can't find the cause
>> of this error.
>>
>> java.lang.ArrayIndexOutOfBoundsException: 512
>> at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>> at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>> at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>> ..
>>
>> Flink version: 1.9.2
>>
>>
>>


Re: Rocksdb Serialization issue

2020-03-05 Thread Arvid Heise
Hi David,

could you please explain what you are actually trying to achieve?

It seems like you are reading in the SinkFunction#open some files from S3
and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading the same file
on multiple parallel sinks?
Is your sink running in a keyed context? Maybe even provide the general
pipeline.

On Thu, Mar 5, 2020 at 2:29 PM David Morin 
wrote:

> Hello Arvid,
>
> After some investigations with the help of my colleague we finally found
> the root cause.
> In order to improve the init of the state, I've created some threads to
> parallelize the read of bucket files.
> This is a temporary solution because I've planned to use the State
> Processor API.
> Here after an abstract of the code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *ExecutorService executorService =
> Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
> bucketFile : xxx) {executorService.submit(() -> {
> try {readBucketFct(XXX); // Update the state with the bucket
> content...} catch (Exception e) {   }
> });}executorService.shutdown();boolean terminated =
> executorService.awaitTermination(initStateTimeoutSeconds,
> TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
> throw new SinkException("Init state failed...") ;}*
>
> After some tests: if I use one thead in my executorService it works. But
> with 2 threads the job fails.
> Can I mitigate this behaviour (in waiting the switch to the State
> Processor API) ?
>
> Thanks
> David
>
>
> Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :
>
>> Hi David,
>>
>> the obvious reason is that your state stored an enum value that is not
>> present anymore. It tries to deserialize the 512. entry in your enum that
>> is not available.
>>
>> However, since it's highly unlikely that you actually have that many enum
>> values in the same enum class, we are actually looking at a corrupt stream,
>> which is hard to fix. Could you describe which state you have?
>>
>> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
>> it's application, it may be that state is incompatible and would need to be
>> migrated.
>>
>> Did you restart from checkpoint or savepoint?
>>
>> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
>> wrote:
>>
>>> Hello,
>>>
>>> I have this Exception in my datastream app and I can't find the root
>>> cause.
>>> I consume data from Kafka and it fails when I try to get a value from my
>>> MapState in RocksDB.
>>> It was working in previous release of my app but I can't find the cause
>>> of this error.
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 512
>>> at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>>> at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>>> at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ..
>>>
>>> Flink version: 1.9.2
>>>
>>>
>>>


Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Yes Arvid, the Sink is keyed by a String dbName::tableName
This is kafka as input but to init the state we have to read Hive delta
files febore consume kafka records. This is ORC files we have to read to
init the state with one directory per table.
A key (primary key) is only in one bucket file. So to init the state per
table (per keyedstream in fact) we've created a pool of threads to read
more than one bucket file in parallel.
This task is performed when the first record for one table is read from
kafka and if the state for this table does not exist (So this code is in
the process method)
Then, we can snapshot the state and reuse it but this task of init must be
done at least once.
We can have more than one instances of Sink but each task is in one JVM and
we can't have more than one task for one table (keystream) at the moment. A
sharding has been developed but not yet tested.
We use yarn session and we specified the --slots option to force one task
per taskmanager because we used a lib (dependency) not thread safe.
So if I'm right we can't read the same bucket file on multiple parallel
sinks at the moment.
But yes to make this task of state init per table faster, I've naively
created this pool of threads.
If I can keep this as a workaround it would be great (in waiting a better
solution: sharding, State processor API, ...)
I'm open to any suggestion for the short or the long term.

Thanks

Le jeu. 5 mars 2020 à 14:35, Arvid Heise  a écrit :

> Hi David,
>
> could you please explain what you are actually trying to achieve?
>
> It seems like you are reading in the SinkFunction#open some files from S3
> and put it into state (bootstrapping?)
> How many instances of the sink are executed?
> How do you shard the buckets / e.g. how do you avoid reading the same file
> on multiple parallel sinks?
> Is your sink running in a keyed context? Maybe even provide the general
> pipeline.
>
> On Thu, Mar 5, 2020 at 2:29 PM David Morin 
> wrote:
>
>> Hello Arvid,
>>
>> After some investigations with the help of my colleague we finally found
>> the root cause.
>> In order to improve the init of the state, I've created some threads to
>> parallelize the read of bucket files.
>> This is a temporary solution because I've planned to use the State
>> Processor API.
>> Here after an abstract of the code:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *ExecutorService executorService =
>> Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
>> bucketFile : xxx) {executorService.submit(() -> {
>> try {readBucketFct(XXX); // Update the state with the bucket
>> content...} catch (Exception e) {   }
>> });}executorService.shutdown();boolean terminated =
>> executorService.awaitTermination(initStateTimeoutSeconds,
>> TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
>> throw new SinkException("Init state failed...") ;}*
>>
>> After some tests: if I use one thead in my executorService it works. But
>> with 2 threads the job fails.
>> Can I mitigate this behaviour (in waiting the switch to the State
>> Processor API) ?
>>
>> Thanks
>> David
>>
>>
>> Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :
>>
>>> Hi David,
>>>
>>> the obvious reason is that your state stored an enum value that is not
>>> present anymore. It tries to deserialize the 512. entry in your enum that
>>> is not available.
>>>
>>> However, since it's highly unlikely that you actually have that many
>>> enum values in the same enum class, we are actually looking at a corrupt
>>> stream, which is hard to fix. Could you describe which state you have?
>>>
>>> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
>>> it's application, it may be that state is incompatible and would need to be
>>> migrated.
>>>
>>> Did you restart from checkpoint or savepoint?
>>>
>>> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
>>> wrote:
>>>
 Hello,

 I have this Exception in my datastream app and I can't find the root
 cause.
 I consume data from Kafka and it fails when I try to get a value from
 my MapState in RocksDB.
 It was working in previous release of my app but I can't find the cause
 of this error.

 java.lang.ArrayIndexOutOfBoundsException: 512
 at
 org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
 at
 org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
 at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
 at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
 at
 org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
 at
 org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java

Re: RocksDB Read IOPs

2018-09-25 Thread Yun Tang
Hi Ning

>From your description, I think you actually concern more about the overall 
>performance instead of the high disk IOPs. Maybe you should first ensure 
>whether the job performance degradation is related to RocksDB's performance.

Then I would share some experience about tuning RocksDB performance. Since you 
did not cache index and filter in block cache, it's no worry about the 
competition between data blocks and index&filter blocks[1]. And to improve the 
read performance, you should increase your block cache size to 256MB or even 
512MB. What's more, writer buffer in rocksDB also acts as a role for reading, 
from our experience, we use 4 max write buffers and 32MB each, e.g.  
setMaxWriteBufferNumber(4) and setWriteBufferSize(32*1024*1024)

Best
Yun Tang

[1] 
https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks

[https://avatars0.githubusercontent.com/u/69631?s=400&v=4]

Block Cache · facebook/rocksdb Wiki · 
GitHub
A library that provides an embeddable, persistent key-value store for fast 
storage. - facebook/rocksdb
github.com



From: Ning Shi 
Sent: Wednesday, September 26, 2018 11:25
To: user
Subject: RocksDB Read IOPs

Hi,

I'm benchmarking a job with large state in various window sizes
(hourly, daily). I noticed that it would consistently slow down after
30 minutes into the benchmark due to high disk read IOPs. The first 30
minutes were fine, with close to 0 disk IOPs. Then after 30 minutes,
read IOPs would gradually climb to as high as 10k/s. At this point,
the job was bottlenecked on disk IOPs because I'm using 2TB EBS-backed
volume.

Another thread on the mailing list mentioned potentially running into
burst IOPs credit could be the cause of slowdown. It's not that in
this case because I'm using 2TB EBS.

Someone also mentioned RocksDB compaction could potentially increase
read IOPs a lot.

I'm currently running the job with these RocksDB settings.

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
final long blockCacheSize = 64 * 1024 * 1024;
return currentOptions
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
);
}

Any insights into how I can further diagnose this? Is there anyway to
see compaction stats or any settings I should try?

Thanks,

Ning


Re: RocksDB Read IOPs

2018-09-26 Thread Ning Shi
Hi Yun,

> From your description, I think you actually concern more about the overall 
> performance instead of the high disk IOPs. Maybe you should first ensure 
> whether the job performance degradation is related to RocksDB's performance.

You are right that my main concern is the overall performance, not that it’s 
reading a lot. I connected the two things together because read IOPs seems to 
jump to a very high number after 30 minutes into the benchmark, which 
correlates to the timing of the overall performance degradation.

> Then I would share some experience about tuning RocksDB performance. Since 
> you did not cache index and filter in block cache, it's no worry about the 
> competition between data blocks and index&filter blocks[1]. And to improve 
> the read performance, you should increase your block cache size to 256MB or 
> even 512MB. What's more, writer buffer in rocksDB also acts as a role for 
> reading, from our experience, we use 4 max write buffers and 32MB each, e.g.  
> setMaxWriteBufferNumber(4) and setWriteBufferSize(32*1024*1024)

This is very helpful. I did try increasing the block cache to 256MB or 512MB. 
It quickly used up the 30GB memory on the EC2 instances.

I found it a little hard to estimate the actual memory usage of RocksDB as 
there might be multiple instances on the same TM depending on the number of 
slots and job. In this case, each instance has 16 cores and 30GB of memory. 
Each TM has 8 slots. The job parallelism equals to the total number of slots 
across all TMs so they use all the slots. Heap is set to 5GB.

With 64MB block cache size, the memory usage seems to hover around 20GB to 
25GB, but it creeps up very slowly over time.

Do you have a good strategy for memory usage estimation or recommendation for 
how much memory each instance should have?

Thanks,

Ning

Re: RocksDB Read IOPs

2018-09-27 Thread Ning Shi
Yun,

> Then I would share some experience about tuning RocksDB performance. Since 
> you did not cache index and filter in block cache, it's no worry about the 
> competition between data blocks and index&filter blocks[1]. And to improve 
> the read performance, you should increase your block cache size to 256MB or 
> even 512MB. What's more, writer buffer in rocksDB also acts as a role for 
> reading, from our experience, we use 4 max write buffers and 32MB each, e.g.  
> setMaxWriteBufferNumber(4) and setWriteBufferSize(32*1024*1024)

Thank you very much for the hints. I read that tuning guide and added
some settings. Now it's doing much much better. The IOPs stays under
300 except for when checkpoints are taken, then it spikes to about
1.5k, which is totally expected.

For reference, the following are the settings I'm using right now. The
reason I didn't bump block cache size is because we have limited
amount of memory per instance (30GB).

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setIncreaseParallelism(4)
.setMaxBackgroundFlushes(1)
.setMaxBackgroundCompactions(1)
.setUseFsync(false)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
final long blockCacheSize = 64 * 1024 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;

return currentOptions
.setCompressionType(CompressionType.LZ4_COMPRESSION)

.setCompactionStyle(CompactionStyle.LEVEL)
.setLevel0FileNumCompactionTrigger(10)
.setLevel0SlowdownWritesTrigger(20)
.setLevel0StopWritesTrigger(40)

.setWriteBufferSize(writeBufferSize) // In-memory memtable size
.setMaxWriteBufferNumber(5) // Max number of memtables
before stalling writes
.setMinWriteBufferNumberToMerge(2) // Merge two
memtables together to reduce duplicate keys

.setTargetFileSizeBase(writeBufferSize) // L0 file
size, same as memtable size
.setMaxBytesForLevelBase(writeBufferSize * 8)

.setTableFormatConfig(
new BlockBasedTableConfig()
.setFilter(new BloomFilter())
.setBlockCacheSize(blockCacheSize)
);
}

Ning


Re: RocksDB / checkpoint questions

2018-02-02 Thread Kien Truong


⁣Sent from TypeApp ​

On Feb 3, 2018, 10:48, at 10:48, Kien Truong  wrote:
>Hi,
>Speaking from my experience, if the distributed disk fail, the
>checkpoint will fail as well, but the job will continue running. The
>checkpoint scheduler will keep running, so the first scheduled
>checkpoint after you repair your disk should succeed.
>
>Of course, if you also write to the distributed disk inside your job,
>then your job may crash too, but this is unrelated to the checkpoint
>process.
>
>Best regards,
>Kien
>
>⁣Sent from TypeApp ​
>
>On Feb 2, 2018, 23:30, at 23:30, Christophe Jolif 
>wrote:
>>If I understand well RocksDB is using two disk, the Task Manager local
>>disk
>>for "local storage" of the state and the distributed disk for
>>checkpointing.
>>
>>Two questions:
>>
>>- if I have 3 TaskManager I should expect more or less (depending on
>>how
>>the tasks are balanced) to find a third of my overall state stored on
>>disk
>>on each of this TaskManager node?
>>
>>- if the local node/disk fails I will get the state back from the
>>distributed disk and things will start again and all is fine. However
>>what
>>happens if the distributed disk fails? Will Flink continue processing
>>waiting for me to mount a new distributed disk? Or will it stop? May I
>>lose
>>data/reprocess things under that condition?
>>
>>--
>>Christophe Jolif


Re: RocksDB / checkpoint questions

2018-02-03 Thread Christophe Jolif
Thanks for sharing Kien. Sounds like the logical behavior but good to hear
it is confirmed by your experience.

--
Christophe

On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong  wrote:

>
>
> Sent from TypeApp 
> On Feb 3, 2018, at 10:48, Kien Truong  wrote:
>>
>> Hi,
>> Speaking from my experience, if the distributed disk fail, the checkpoint
>> will fail as well, but the job will continue running. The checkpoint
>> scheduler will keep running, so the first scheduled checkpoint after you
>> repair your disk should succeed.
>>
>> Of course, if you also write to the distributed disk inside your job,
>> then your job may crash too, but this is unrelated to the checkpoint
>> process.
>>
>> Best regards,
>> Kien
>>
>> Sent from TypeApp 
>> On Feb 2, 2018, at 23:30, Christophe Jolif < cjo...@gmail.com> wrote:
>>>
>>> If I understand well RocksDB is using two disk, the Task Manager local
>>> disk for "local storage" of the state and the distributed disk for
>>> checkpointing.
>>>
>>> Two questions:
>>>
>>> - if I have 3 TaskManager I should expect more or less (depending on how
>>> the tasks are balanced) to find a third of my overall state stored on disk
>>> on each of this TaskManager node?
>>>
>>> - if the local node/disk fails I will get the state back from the
>>> distributed disk and things will start again and all is fine. However what
>>> happens if the distributed disk fails? Will Flink continue processing
>>> waiting for me to mount a new distributed disk? Or will it stop? May I lose
>>> data/reprocess things under that condition?
>>>
>>> --
>>> Christophe Jolif
>>>
>>


Re: RocksDB / checkpoint questions

2018-02-05 Thread Stefan Richter
Hi,

you are correct that RocksDB has a „working directory“ on local disk and 
checkpoints + savepoints go to a distributed filesystem.

- if I have 3 TaskManager I should expect more or less (depending on how the 
tasks are balanced) to find a third of my overall state stored on disk on each 
of this TaskManager node?

This question is not so much about RocksDB, but more about Flink’s keyBy 
partitioning, i.e. how work is distributed between the parallel instances of an 
operator, and the answer is that it will apply hash partitioning based on your 
event keys to distribute the keys (and their state) between your 3 nodes. If 
your key space is very skewed or there are heavy hitter keys with much larger 
state than most other keys, this can lead to some imbalances. If your keys are 
not skewed and have similar state size, every node should have roughly the same 
state size.

- if the local node/disk fails I will get the state back from the distributed 
disk and things will start again and all is fine. However what happens if the 
distributed disk fails? Will Flink continue processing waiting for me to mount 
a new distributed disk? Or will it stop? May I lose data/reprocess things under 
that condition? 

Starting from Flink 1.5, this is configurable, please see 
https://issues.apache.org/jira/browse/FLINK-4809 
 and 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html
 

 in section „fail/continue task on checkpoint errors“. If you tolerate 
checkpoint failures, you will not lose data: if your job fails, it can recover 
from the latest successful checkpoint once your DFS is again available If the 
job does not fail, it will eventually make another checkpoint once DFS is back. 
If you do not tolerate checkpoint failures, your job will simply fail and 
restart from the last successful checkpoint and recover once DFS is back.

Best,
Stefan

> Am 03.02.2018 um 17:45 schrieb Christophe Jolif :
> 
> Thanks for sharing Kien. Sounds like the logical behavior but good to hear it 
> is confirmed by your experience.
> 
> --
> Christophe
> 
> On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong  > wrote:
> 
> 
> Sent from TypeApp 
> On Feb 3, 2018, at 10:48, Kien Truong  > wrote:
> Hi, 
> Speaking from my experience, if the distributed disk fail, the checkpoint 
> will fail as well, but the job will continue running. The checkpoint 
> scheduler will keep running, so the first scheduled checkpoint after you 
> repair your disk should succeed. 
> 
> Of course, if you also write to the distributed disk inside your job, then 
> your job may crash too, but this is unrelated to the checkpoint process. 
> 
> Best regards, 
> Kien 
> 
> Sent from TypeApp 
> On Feb 2, 2018, at 23:30, Christophe Jolif < cjo...@gmail.com 
> > wrote:
> If I understand well RocksDB is using two disk, the Task Manager local disk 
> for "local storage" of the state and the distributed disk for checkpointing.
> 
> Two questions:
> 
> - if I have 3 TaskManager I should expect more or less (depending on how the 
> tasks are balanced) to find a third of my overall state stored on disk on 
> each of this TaskManager node?
> 
> - if the local node/disk fails I will get the state back from the distributed 
> disk and things will start again and all is fine. However what happens if the 
> distributed disk fails? Will Flink continue processing waiting for me to 
> mount a new distributed disk? Or will it stop? May I lose data/reprocess 
> things under that condition? 
> 
> -- 
> Christophe Jolif
> 



Re: RocksDB / checkpoint questions

2018-02-05 Thread Christophe Jolif
Thanks a lot for the details Steffan.

--
Christophe

On Mon, Feb 5, 2018 at 11:31 AM, Stefan Richter  wrote:

> Hi,
>
> you are correct that RocksDB has a „working directory“ on local disk and
> checkpoints + savepoints go to a distributed filesystem.
>
> - if I have 3 TaskManager I should expect more or less (depending on how
> the tasks are balanced) to find a third of my overall state stored on disk
> on each of this TaskManager node?
>
> This question is not so much about RocksDB, but more about Flink’s keyBy
> partitioning, i.e. how work is distributed between the parallel instances
> of an operator, and the answer is that it will apply hash partitioning
> based on your event keys to distribute the keys (and their state) between
> your 3 nodes. If your key space is very skewed or there are heavy hitter
> keys with much larger state than most other keys, this can lead to some
> imbalances. If your keys are not skewed and have similar state size, every
> node should have roughly the same state size.
>
> - if the local node/disk fails I will get the state back from the
> distributed disk and things will start again and all is fine. However what
> happens if the distributed disk fails? Will Flink continue processing
> waiting for me to mount a new distributed disk? Or will it stop? May I lose
> data/reprocess things under that condition?
>
> Starting from Flink 1.5, this is configurable, please see
> https://issues.apache.org/jira/browse/FLINK-4809 and htt
> ps://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/
> checkpointing.html in section „*fail/continue task on checkpoint errors*“.
> If you tolerate checkpoint failures, you will not lose data: if your job
> fails, it can recover from the latest successful checkpoint once your DFS
> is again available If the job does not fail, it will eventually make
> another checkpoint once DFS is back. If you do not tolerate checkpoint
> failures, your job will simply fail and restart from the last successful
> checkpoint and recover once DFS is back.
>
> Best,
> Stefan
>
> Am 03.02.2018 um 17:45 schrieb Christophe Jolif :
>
> Thanks for sharing Kien. Sounds like the logical behavior but good to hear
> it is confirmed by your experience.
>
> --
> Christophe
>
> On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong 
> wrote:
>
>>
>>
>> Sent from TypeApp 
>> On Feb 3, 2018, at 10:48, Kien Truong  wrote:
>>>
>>> Hi,
>>> Speaking from my experience, if the distributed disk fail, the
>>> checkpoint will fail as well, but the job will continue running. The
>>> checkpoint scheduler will keep running, so the first scheduled checkpoint
>>> after you repair your disk should succeed.
>>>
>>> Of course, if you also write to the distributed disk inside your job,
>>> then your job may crash too, but this is unrelated to the checkpoint
>>> process.
>>>
>>> Best regards,
>>> Kien
>>>
>>> Sent from TypeApp 
>>> On Feb 2, 2018, at 23:30, Christophe Jolif < cjo...@gmail.com> wrote:

 If I understand well RocksDB is using two disk, the Task Manager local
 disk for "local storage" of the state and the distributed disk for
 checkpointing.

 Two questions:

 - if I have 3 TaskManager I should expect more or less (depending on
 how the tasks are balanced) to find a third of my overall state stored on
 disk on each of this TaskManager node?

 - if the local node/disk fails I will get the state back from the
 distributed disk and things will start again and all is fine. However what
 happens if the distributed disk fails? Will Flink continue processing
 waiting for me to mount a new distributed disk? Or will it stop? May I lose
 data/reprocess things under that condition?

 --
 Christophe Jolif

>>>
>


Re: Rocksdb in production

2018-03-05 Thread Fabian Hueske
Hi,
RockDB is an embedded key-value storage that is internally used by Flink.
There is no need to setup a RocksDB database or service yourself. All of
that is done by Flink.
As a Flink user that uses the RockDB state backend, you won't get in touch
with RocksDB itself.

Besides that, RocksDB is developed by Facebook [1] and is a fairly active
project.

Best, Fabian

[1] https://github.com/facebook/rocksdb

2018-03-05 3:57 GMT-08:00 Jayant Ameta :

> Hi,
> I wanted to know how's the online support and resources for RocksDB? I
> want to use RocksDB as the state backend, but I'm not sure how active the
> community is. Can anyone vouch for it?
>


Re: Rocksdb in production

2018-03-05 Thread Jayant Ameta
Oh! Somehow I missed while reading the documentation that RocksDB is
embedded in Flink.

Also, irrespective of state backend being filesystem or rocksdb, I'll have
to setup a shared filesystem (HDFS, S3, etc). Is my understanding correct?


Jayant Ameta

On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:

> Hi,
> RockDB is an embedded key-value storage that is internally used by Flink.
> There is no need to setup a RocksDB database or service yourself. All of
> that is done by Flink.
> As a Flink user that uses the RockDB state backend, you won't get in touch
> with RocksDB itself.
>
> Besides that, RocksDB is developed by Facebook [1] and is a fairly active
> project.
>
> Best, Fabian
>
> [1] https://github.com/facebook/rocksdb
>
> 2018-03-05 3:57 GMT-08:00 Jayant Ameta :
>
>> Hi,
>> I wanted to know how's the online support and resources for RocksDB? I
>> want to use RocksDB as the state backend, but I'm not sure how active the
>> community is. Can anyone vouch for it?
>>
>
>


Re: Rocksdb in production

2018-03-05 Thread Fabian Hueske
Yes, that is correct.

2018-03-05 8:57 GMT-08:00 Jayant Ameta :

> Oh! Somehow I missed while reading the documentation that RocksDB is
> embedded in Flink.
>
> Also, irrespective of state backend being filesystem or rocksdb, I'll have
> to setup a shared filesystem (HDFS, S3, etc). Is my understanding correct?
>
>
> Jayant Ameta
>
> On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:
>
>> Hi,
>> RockDB is an embedded key-value storage that is internally used by Flink.
>> There is no need to setup a RocksDB database or service yourself. All of
>> that is done by Flink.
>> As a Flink user that uses the RockDB state backend, you won't get in
>> touch with RocksDB itself.
>>
>> Besides that, RocksDB is developed by Facebook [1] and is a fairly active
>> project.
>>
>> Best, Fabian
>>
>> [1] https://github.com/facebook/rocksdb
>>
>> 2018-03-05 3:57 GMT-08:00 Jayant Ameta :
>>
>>> Hi,
>>> I wanted to know how's the online support and resources for RocksDB? I
>>> want to use RocksDB as the state backend, but I'm not sure how active the
>>> community is. Can anyone vouch for it?
>>>
>>
>>
>


Re: Rocksdb in production

2018-03-06 Thread Jayant Ameta
Thanks Fabian.
Will there be any performance issues if I use NFS as the shared filesystem
(as compared to HDFS or S3)?

Jayant Ameta

On Mon, Mar 5, 2018 at 10:31 PM, Fabian Hueske  wrote:

> Yes, that is correct.
>
> 2018-03-05 8:57 GMT-08:00 Jayant Ameta :
>
>> Oh! Somehow I missed while reading the documentation that RocksDB is
>> embedded in Flink.
>>
>> Also, irrespective of state backend being filesystem or rocksdb, I'll
>> have to setup a shared filesystem (HDFS, S3, etc). Is my understanding
>> correct?
>>
>>
>> Jayant Ameta
>>
>> On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:
>>
>>> Hi,
>>> RockDB is an embedded key-value storage that is internally used by
>>> Flink. There is no need to setup a RocksDB database or service yourself.
>>> All of that is done by Flink.
>>> As a Flink user that uses the RockDB state backend, you won't get in
>>> touch with RocksDB itself.
>>>
>>> Besides that, RocksDB is developed by Facebook [1] and is a fairly
>>> active project.
>>>
>>> Best, Fabian
>>>
>>> [1] https://github.com/facebook/rocksdb
>>>
>>> 2018-03-05 3:57 GMT-08:00 Jayant Ameta :
>>>
 Hi,
 I wanted to know how's the online support and resources for RocksDB? I
 want to use RocksDB as the state backend, but I'm not sure how active the
 community is. Can anyone vouch for it?

>>>
>>>
>>
>


Re: Rocksdb in production

2018-03-06 Thread Fabian Hueske
That depends on your job and the setup.
Remember that all operators will write their checkpoint data into that file
system.
If the state grows very large and only have an NFS with little write
performance, it might be a problem. But the same would apply to HDFS as
well.

2018-03-06 2:51 GMT-08:00 Jayant Ameta :

> Thanks Fabian.
> Will there be any performance issues if I use NFS as the shared filesystem
> (as compared to HDFS or S3)?
>
> Jayant Ameta
>
> On Mon, Mar 5, 2018 at 10:31 PM, Fabian Hueske  wrote:
>
>> Yes, that is correct.
>>
>> 2018-03-05 8:57 GMT-08:00 Jayant Ameta :
>>
>>> Oh! Somehow I missed while reading the documentation that RocksDB is
>>> embedded in Flink.
>>>
>>> Also, irrespective of state backend being filesystem or rocksdb, I'll
>>> have to setup a shared filesystem (HDFS, S3, etc). Is my understanding
>>> correct?
>>>
>>>
>>> Jayant Ameta
>>>
>>> On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:
>>>
 Hi,
 RockDB is an embedded key-value storage that is internally used by
 Flink. There is no need to setup a RocksDB database or service yourself.
 All of that is done by Flink.
 As a Flink user that uses the RockDB state backend, you won't get in
 touch with RocksDB itself.

 Besides that, RocksDB is developed by Facebook [1] and is a fairly
 active project.

 Best, Fabian

 [1] https://github.com/facebook/rocksdb

 2018-03-05 3:57 GMT-08:00 Jayant Ameta :

> Hi,
> I wanted to know how's the online support and resources for RocksDB? I
> want to use RocksDB as the state backend, but I'm not sure how active the
> community is. Can anyone vouch for it?
>


>>>
>>
>


Re: RocksDB KeyValue store

2019-07-29 Thread taher koitawala
I believe Flink serialization is really fast and GC is much better from
Flink 1.6 release, along side the state depends on what you do with it.
each task manager has its own instance of rocks DB and is responsible for
snapshot for his own instance upon checkpointing.

Further more if you used a keyed state only then load is distributed across
TMs. Because each TM then caters to a specific keyGroup. If you use a list
or a value state then things are different depending to what you need to
store and access or update the values.

On Tue, Jul 30, 2019, 12:09 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I looked at the RocksDB KV store implementation and I found that
> deserialization has to happen for each key lookup. Given a scenario where
> the key lookup has to happen for every single message would it still be a
> good idea to store it in rocksdb store or would in-memory store/cache be
> more efficient? I know if the data is stored in KV store it will
> automatically distribute when scale up/scale down happens and its fault
> tolerant.
>
> For example, if there are 1M user events and a user config of size 1KB is
> persisted into rocksdb then for each event would the state have to be
> deserialized? Wouldn't this create so many garbage?
>
> Also, is there is per machine sort of state store which can be used for
> all keys that are sent to that task manager?
>
> Thanks
>


Re: Rocksdb Incremental checkpoint

2022-12-19 Thread Hangxiang Yu
Hi,
IIUC, numRetainedCheckpoints will only influence the space overhead of
checkpoint dir, but not the incremental size.
RocksDB executes incremental checkpoint based on the shard directory which
will always remain SST Files as much as possible (maybe it's from the last
checkpoint, or maybe from long long ago).
numRetainedCheckpoints just makes flink remain more cp-x directory and SST
Files in shared directory not used in the next incremental checkpoint.
Whether it's 1 or 3, the size of the incremental checkpoint should be
similar.

Could you check your configuration, source status, job status, etc again
to find whether there are any other differences ?

On Mon, Dec 19, 2022 at 9:00 PM Puneet Duggal 
wrote:

> Hi,
>
> After going through the following article regarding rocksdb incremental
> checkpoint (
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html),
> my understanding was that at each checkpoint, flink only checkpoints newly
> created SSTables whereas other it can reference from earlier checkpoints
> (depending upon num of retained checkpoints).
>
> So can we assume from this that if numRetainedCheckpoints = 1 (default),
> behaviour is similar as checkpointing comeplete data as it is (same as non
> incremental checkpointing).
>
> Also performed a load test by running exactly same flink job on 2
> different clusters. Only difference between all these clusters were
> numOfRetained checkpoints.
>
> Incremental Checkpoint Load Test
>
> Cluster 1
>
> num Retained Checkpoints = 3
>
>
> Cluster 2
>
> num Retained Checkpoints = 1
>
>
>
> As we can see, checkpoint data size for cluster with num of retained
> checkpoints = 1 is less than one with greater number of retained
> checkpoints.
>
>
>

-- 
Best,
Hangxiang.


Re: rocksdb block cache usage

2021-01-27 Thread Chesnay Schepler
I don't quite understand the question; all 3 metrics you listed are the 
same one?


On 1/27/2021 9:23 AM, ?? wrote:

hi, all
?0?2 ?0?2I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
?0?2and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
metric exposed.
?0?2I'm confused?0?2 that the total memory used for block cache pinned is 
sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
or just
?0?2flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
(for block cache usage the metric seems per slot)?







Re: rocksdb block cache usage

2021-01-27 Thread Yun Tang
Hi,

If you have enabled managed memory, and since all rocksDB instances share the 
same block cache within one slot, all 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 in the same slot would report the same value.


Best
Yun Tang

From: Chesnay Schepler 
Sent: Wednesday, January 27, 2021 20:59
To: 曾祥才 ; User-Flink 
Subject: Re: rocksdb block cache usage

I don't quite understand the question; all 3 metrics you listed are the same 
one?

On 1/27/2021 9:23 AM, 曾祥才 wrote:
hi, all
   I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
 and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed.
 I'm confused  that the total memory used for block cache pinned is sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)?






Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread Matthias Pohl
Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks
like the OutputStream is added to a CloseableRegistry before writing to it
[1].

My suspicion is - based on the exception cause - that the CloseableRegistry
got triggered while restoring the state. I tried to track down the source
of the CloseableRegistry. It looks like it's handed down from the
StreamTask [2].

The StreamTask closes the CloseableRegistry either when cancelling is
triggered or in the class' finalize method. Have you checked the logs to
see whether there was some task cancellation logged?

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269

On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
wrote:

> Hello Hivemind,
>
> We are running a stateful streaming job. Each task manager instance hosts
> around ~100GB of data. During restart of task managers we encountered
> following errors, because of which the job is not able to restart.
> Initially we thought it might be due to failing status checks of attached
> EBS volumes or burst balance exhaustion but AWS console is not indicating
> any issue with EBS volumes. Is there anything that else that we need to
> look at which can potentially cause this exception? Also it's quite unclear
> what exactly is the cause of the exception, any help on that would be much
> appreciated.
>
> Flink version: 1.12.2_scala_2.11
> Environment: Kubernetes on AWS
> Volume Type: EBS, gp2 300GiB
>
> *ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
> - Caught unexpected exception.*
> *java.nio.channels.ClosedChannelException: null*
> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> ~[?:?]*
> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]*
> * at java.lang.Thread.run(Thread.java:830) [?:?]*
> *2021-03-19 15:26:10,385 WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
> alternative (1/1), will retry while more alternatives are available.*
> *org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> 

Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread dhanesh arole
Hi Matthias,

Thanks for taking to help us with this.

You are right there were lots of task cancellations, as this exception
causes the job to get restarted, triggering cancellations.


-
Dhanesh Arole


On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl 
wrote:

> Hi Danesh,
> thanks for reaching out to the Flink community. Checking the code, it
> looks like the OutputStream is added to a CloseableRegistry before writing
> to it [1].
>
> My suspicion is - based on the exception cause - that the
> CloseableRegistry got triggered while restoring the state. I tried to track
> down the source of the CloseableRegistry. It looks like it's handed down
> from the StreamTask [2].
>
> The StreamTask closes the CloseableRegistry either when cancelling is
> triggered or in the class' finalize method. Have you checked the logs to
> see whether there was some task cancellation logged?
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
> [2]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269
>
> On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
> wrote:
>
>> Hello Hivemind,
>>
>> We are running a stateful streaming job. Each task manager instance hosts
>> around ~100GB of data. During restart of task managers we encountered
>> following errors, because of which the job is not able to restart.
>> Initially we thought it might be due to failing status checks of attached
>> EBS volumes or burst balance exhaustion but AWS console is not indicating
>> any issue with EBS volumes. Is there anything that else that we need to
>> look at which can potentially cause this exception? Also it's quite unclear
>> what exactly is the cause of the exception, any help on that would be much
>> appreciated.
>>
>> Flink version: 1.12.2_scala_2.11
>> Environment: Kubernetes on AWS
>> Volume Type: EBS, gp2 300GiB
>>
>> *ERROR
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
>> - Caught unexpected exception.*
>> *java.nio.channels.ClosedChannelException: null*
>> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> ~[?:?]*
>> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
>> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
>> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
>> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ~[?:?]*
>> * at java.lang.Thread.run(Thread.java:830) [?:?]*
>> *2021-03-19 15:26:10,385 WARN
>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
>> Exception while restoring keyed state backend for
>> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
>> alternative (1/1), will retry while more alternatives are available.*
>> *org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * 

Re: RocksDB CPU resource usage

2021-06-15 Thread JING ZHANG
Hi Padarn,
After switch stateBackend from filesystem to rocksdb, all reads/writes
from/to backend have to go through de-/serialization to retrieve/store the
state objects, this may cause more cpu cost.
But I'm not sure it is the main reason leads to 3x CPU cost in your job.
To find out the reason, we need more profile on CPU cost, such as Flame
Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
in Flink[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

Best,
JING ZHANG

Padarn Wilson  于2021年6月15日周二 下午5:05写道:

> Hi all,
>
> We have a job that we just enabled rocksdb on (instead of file backend),
> and see that the CPU usage is almost 3x greater on (we had to increase
> taskmanagers 3x to get it to run.
>
> I don't really understand this, is there something we can look at to
> understand why CPU use is so high? Our state mostly consists of aggregation
> windows.
>
> Cheers,
> Padarn
>


Re: RocksDB CPU resource usage

2021-06-16 Thread Robert Metzger
Depending on the datatypes you are using, seeing 3x more CPU usage seems
realistic.
Serialization can be quite expensive. See also:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
Maybe it makes sense to optimize there a bit.

On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:

> Hi Padarn,
> After switch stateBackend from filesystem to rocksdb, all reads/writes
> from/to backend have to go through de-/serialization to retrieve/store the
> state objects, this may cause more cpu cost.
> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
> To find out the reason, we need more profile on CPU cost, such as Flame
> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
> in Flink[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> Best,
> JING ZHANG
>
> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>
>> Hi all,
>>
>> We have a job that we just enabled rocksdb on (instead of file backend),
>> and see that the CPU usage is almost 3x greater on (we had to increase
>> taskmanagers 3x to get it to run.
>>
>> I don't really understand this, is there something we can look at to
>> understand why CPU use is so high? Our state mostly consists of aggregation
>> windows.
>>
>> Cheers,
>> Padarn
>>
>


Re: RocksDB CPU resource usage

2021-06-16 Thread Padarn Wilson
Thanks Robert. I think it would be easy enough to test this hypothesis by
making the same comparison with some simpler state inside the aggregation
window.

On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:

> Depending on the datatypes you are using, seeing 3x more CPU usage seems
> realistic.
> Serialization can be quite expensive. See also:
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> Maybe it makes sense to optimize there a bit.
>
> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>
>> Hi Padarn,
>> After switch stateBackend from filesystem to rocksdb, all reads/writes
>> from/to backend have to go through de-/serialization to retrieve/store the
>> state objects, this may cause more cpu cost.
>> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
>> To find out the reason, we need more profile on CPU cost, such as Flame
>> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
>> in Flink[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>
>> Best,
>> JING ZHANG
>>
>> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>>
>>> Hi all,
>>>
>>> We have a job that we just enabled rocksdb on (instead of file backend),
>>> and see that the CPU usage is almost 3x greater on (we had to increase
>>> taskmanagers 3x to get it to run.
>>>
>>> I don't really understand this, is there something we can look at to
>>> understand why CPU use is so high? Our state mostly consists of aggregation
>>> windows.
>>>
>>> Cheers,
>>> Padarn
>>>
>>


Re: RocksDB CPU resource usage

2021-06-16 Thread Robert Metzger
If you are able to execute your job locally as well (with enough data), you
can also run it with a profiler and see the CPU cycles spent on
serialization (you can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:

> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
>> Depending on the datatypes you are using, seeing 3x more CPU usage seems
>> realistic.
>> Serialization can be quite expensive. See also:
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> Maybe it makes sense to optimize there a bit.
>>
>> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>>
>>> Hi Padarn,
>>> After switch stateBackend from filesystem to rocksdb, all reads/writes
>>> from/to backend have to go through de-/serialization to retrieve/store the
>>> state objects, this may cause more cpu cost.
>>> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
>>> To find out the reason, we need more profile on CPU cost, such as Flame
>>> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
>>> in Flink[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>>>
 Hi all,

 We have a job that we just enabled rocksdb on (instead of file
 backend), and see that the CPU usage is almost 3x greater on (we had to
 increase taskmanagers 3x to get it to run.

 I don't really understand this, is there something we can look at to
 understand why CPU use is so high? Our state mostly consists of aggregation
 windows.

 Cheers,
 Padarn

>>>


Re: RocksDB CPU resource usage

2021-06-17 Thread Yun Tang
Hi Padarn,

>From my experiences, de-/serialization might not consume 3x CPU usage, and the 
>background compaction could also increase the CPU usage. You could use 
>async-profiler [1] to figure out what really consumed your CPU usage as it 
>could also detect the native RocksDB thread stack.


[1] https://github.com/jvm-profiling-tools/async-profiler

Best
Yun Tang


From: Robert Metzger 
Sent: Thursday, June 17, 2021 14:11
To: Padarn Wilson 
Cc: JING ZHANG ; user 
Subject: Re: RocksDB CPU resource usage

If you are able to execute your job locally as well (with enough data), you can 
also run it with a profiler and see the CPU cycles spent on serialization (you 
can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson 
mailto:pad...@gmail.com>> wrote:
Thanks Robert. I think it would be easy enough to test this hypothesis by 
making the same comparison with some simpler state inside the aggregation 
window.

On Wed, 16 Jun 2021, 7:58 pm Robert Metzger, 
mailto:rmetz...@apache.org>> wrote:
Depending on the datatypes you are using, seeing 3x more CPU usage seems 
realistic.
Serialization can be quite expensive. See also: 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
Maybe it makes sense to optimize there a bit.

On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Padarn,
After switch stateBackend from filesystem to rocksdb, all reads/writes from/to 
backend have to go through de-/serialization to retrieve/store the state 
objects, this may cause more cpu cost.
But I'm not sure it is the main reason leads to 3x CPU cost in your job.
To find out the reason, we need more profile on CPU cost, such as Flame Graphs. 
BTW, starting with Flink 1.13, Flame Graphs are natively supported in Flink[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

Best,
JING ZHANG

Padarn Wilson mailto:pad...@gmail.com>> 于2021年6月15日周二 
下午5:05写道:
Hi all,

We have a job that we just enabled rocksdb on (instead of file backend), and 
see that the CPU usage is almost 3x greater on (we had to increase taskmanagers 
3x to get it to run.

I don't really understand this, is there something we can look at to understand 
why CPU use is so high? Our state mostly consists of aggregation windows.

Cheers,
Padarn


Re: RocksDB CPU resource usage

2021-06-17 Thread Padarn Wilson
Thanks both for the suggestions, all good ideas. I will try some of the
profiling suggestions and report back.

On Thu, Jun 17, 2021 at 4:13 PM Yun Tang  wrote:

> Hi Padarn,
>
> From my experiences, de-/serialization might not consume 3x CPU usage, and
> the background compaction could also increase the CPU usage. You could use
> async-profiler [1] to figure out what really consumed your CPU usage as it
> could also detect the native RocksDB thread stack.
>
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
>
> Best
> Yun Tang
>
> --
> *From:* Robert Metzger 
> *Sent:* Thursday, June 17, 2021 14:11
> *To:* Padarn Wilson 
> *Cc:* JING ZHANG ; user 
> *Subject:* Re: RocksDB CPU resource usage
>
> If you are able to execute your job locally as well (with enough data),
> you can also run it with a profiler and see the CPU cycles spent on
> serialization (you can also use RocksDB locally)
>
> On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:
>
> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
> Depending on the datatypes you are using, seeing 3x more CPU usage seems
> realistic.
> Serialization can be quite expensive. See also:
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> Maybe it makes sense to optimize there a bit.
>
> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>
> Hi Padarn,
> After switch stateBackend from filesystem to rocksdb, all reads/writes
> from/to backend have to go through de-/serialization to retrieve/store the
> state objects, this may cause more cpu cost.
> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
> To find out the reason, we need more profile on CPU cost, such as Flame
> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
> in Flink[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> Best,
> JING ZHANG
>
> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>
> Hi all,
>
> We have a job that we just enabled rocksdb on (instead of file backend),
> and see that the CPU usage is almost 3x greater on (we had to increase
> taskmanagers 3x to get it to run.
>
> I don't really understand this, is there something we can look at to
> understand why CPU use is so high? Our state mostly consists of aggregation
> windows.
>
> Cheers,
> Padarn
>
>


Re: RocksDB default logging configuration

2020-04-22 Thread Bajaj, Abhinav
Bumping this one again to catch some attention.

From: "Bajaj, Abhinav" 
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org" 
Subject: RocksDB default logging configuration

Hi,

Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.
It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.

Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.
And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.

Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?

Appreciate the help!

~ Abhinav Bajaj

PS:  Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();
stateBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}
});




Re: RocksDB default logging configuration

2020-04-22 Thread Chesnay Schepler
AFAIK this is not possible; the client doesn't know anything about the 
cluster configuration.


FLINK-15747 proposes to add an additional config option for controlling 
the logging behavior.


The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.



On 23/04/2020 03:24, Bajaj, Abhinav wrote:


Bumping this one again to catch some attention.

*From: *"Bajaj, Abhinav" 
*Date: *Monday, April 20, 2020 at 3:23 PM
*To: *"user@flink.apache.org" 
*Subject: *RocksDB default logging configuration

Hi,

Some of our teams ran into the disk space issues because of RocksDB 
default logging configuration - FLINK-15068 
.


It seems the workaround suggested uses the OptionsFactory to set some 
of the parameters from inside the job.


Since we provision the Flink cluster(version 1.7.1) for the teams, we 
control the RocksDB statebackend configuration from flink-conf.yaml.


And it seems there isn’t any related RocksDB configuration 
 
to set in flink-conf.yaml.


Is there a way for the job developer to retrieve the default 
statebackend information from the cluster in the job and set the 
DBOptions on top of it?


Appreciate the help!

~ Abhinav Bajaj

PS: Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment./getExecutionEnvironment/();


StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();

stateBackend.setOptions(new OptionsFactory() {

@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel./WARN_LEVEL/);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {

  return columnFamilyOptions;
}

});





Re: RocksDB default logging configuration

2020-04-27 Thread Bajaj, Abhinav
It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.
RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);

Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.
So I had to set from the job as above.

I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.
I am using Flink 1.7.1.

Thanks Chesnay for your response below.

~ Abhinav Bajaj

From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.

AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.

FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.

The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.


On 23/04/2020 03:24, Bajaj, Abhinav wrote:
Bumping this one again to catch some attention.

From: "Bajaj, Abhinav" <mailto:abhinav.ba...@here.com>
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> 
<mailto:user@flink.apache.org>
Subject: RocksDB default logging configuration

Hi,

Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15068&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=AQYtpXMPmcyKpYnL4GoHJUI8s59h8Z2C4qU0wIgN6wQ%3D&reserved=0>.
It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.

Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.
And it seems there isn’t any related RocksDB 
configuration<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.7%2Fops%2Fconfig.html%23rocksdb-state-backend&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=Pj1JIfN2AHc9jinYAEv6znjrcttwcEYIDnIdfQsf3cA%3D&reserved=0>
 to set in flink-conf.yaml.

Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?

Appreciate the help!

~ Abhinav Bajaj

PS:  Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();
stateBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}
});






Re: RocksDB default logging configuration

2020-04-27 Thread Yun Tang
Hi Bajaj

Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.

And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.

[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301

Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration


It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" <mailto:abhinav.ba...@here.com>
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> 
<mailto:user@flink.apache.org>
Subject: RocksDB default logging configuration



Hi,



Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15068&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=AQYtpXMPmcyKpYnL4GoHJUI8s59h8Z2C4qU0wIgN6wQ%3D&reserved=0>.

It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.



Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.

And it seems there isn’t any related RocksDB 
configuration<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.7%2Fops%2Fconfig.html%23rocksdb-state-backend&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=Pj1JIfN2AHc9jinYAEv6znjrcttwcEYIDnIdfQsf3cA%3D&reserved=0>
 to set in flink-conf.yaml.



Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?



Appreciate the help!



~ Abhinav Bajaj



PS:  Sharing below snippet as desired option if possible -



StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();

stateBackend.setOptions(new OptionsFactory() {

@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}

});








Re: RocksDB default logging configuration

2020-04-28 Thread Bajaj, Abhinav
Thanks Yun for your response.

It seems creating the RocksDBStateBackend from the job requires providing the 
checkpoint URL whereas the savepoint url seems to default to 
“state.savepoints.dir” of the flink-conf.yaml.

I was expecting similar behavior to create the RocksDBStateBackend without 
providing the checkpoint url and it would default to “state.checkpoints.dir” of 
the flink-conf.yaml, like savepoints.
But it seems there is no option to do that (check my original mail below).

Am I misinterpreting the code or documentation? Is my observation correct?

Appreciate the engagement.

Thanks much,
~ Abhinav Bajaj

From: Yun Tang 
Date: Monday, April 27, 2020 at 8:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.

Hi Bajaj

Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.

And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.

[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F808cc1a23abb25bd03d24d75537a1e7c6987eef7%2Fflink-state-backends%2Fflink-statebackend-rocksdb%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcontrib%2Fstreaming%2Fstate%2FRocksDBStateBackend.java%23L285-L301&data=01%7C01%7C%7Cd65837b572c24ad5d95408d7eb22b2b0%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=%2BQQco%2BfG64my%2FLqrQKuThsh11b%2BWKnhgdK253U%2FVTMI%3D&reserved=0>

Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration


It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" <mailto:abhinav.ba...@here.com>
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> 
<mailto:user@flink.apache.org>
Subject: RocksDB default logging configuration



Hi,



Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15068&data=01%7C01%7C%7Cd65837b572c24ad5d95408d7eb22b2b0%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=Mo2FWyl3yKMFHw9um3LeAkQIfp3UyQi1po69HF1h6Mc%3D&reserved=0>.

It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.



Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.

And it seems there isn’t any related RocksDB 
configuration<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.7%2Fops%2Fconfig.html%23rocksdb-state-backend&data=01%7C01%7C%7Cd65

Re: RocksDB default logging configuration

2020-04-29 Thread Yun Tang
Hi Bajaj

Actually I don't totally understand what's your description, which conflicts 
with Flink codebase. Please follow either one of guide below to create 
RocksDBStateBackend:


  *   Set the state backend to environment programmatically, which has the 
highest priority over configuration in flink-conf.yaml
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));

  *   Configure at least two options below in flink-conf.yaml to use 
RocksDBStateBackend (this could be overridden by setting state backend 
expliclitly in environment):
state.backend: rocksdb
state.checkpoints.dir: hdfs:///checkpoint-path

Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Wednesday, April 29, 2020 3:16
To: Yun Tang ; user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration


Thanks Yun for your response.



It seems creating the RocksDBStateBackend from the job requires providing the 
checkpoint URL whereas the savepoint url seems to default to 
“state.savepoints.dir” of the flink-conf.yaml.



I was expecting similar behavior to create the RocksDBStateBackend without 
providing the checkpoint url and it would default to “state.checkpoints.dir” of 
the flink-conf.yaml, like savepoints.

But it seems there is no option to do that (check my original mail below).



Am I misinterpreting the code or documentation? Is my observation correct?



Appreciate the engagement.



Thanks much,

~ Abhinav Bajaj



From: Yun Tang 
Date: Monday, April 27, 2020 at 8:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



Hi Bajaj



Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.



And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.



[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F808cc1a23abb25bd03d24d75537a1e7c6987eef7%2Fflink-state-backends%2Fflink-statebackend-rocksdb%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcontrib%2Fstreaming%2Fstate%2FRocksDBStateBackend.java%23L285-L301&data=01%7C01%7C%7Cd65837b572c24ad5d95408d7eb22b2b0%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=%2BQQco%2BfG64my%2FLqrQKuThsh11b%2BWKnhgdK253U%2FVTMI%3D&reserved=0>



Best

Yun Tang



From: Bajaj, Abhinav 
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration



It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" <mailto:abhinav.ba...@here.com>
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> 
<mailto:user@flink.apache.org>
Subject: RocksDB default logging configurat

Re: RocksDB State Backend Exception

2018-10-25 Thread Andrey Zagrebin
Hi Ning,

The problem here first of all is that RocksDB java JNI client diverged from 
RocksDB cpp code in status.h,
as mentioned in the Flink issue you refer to.

Flink 1.6 uses RocksDB 5.7.5 java client. 
The JNI code there misses these status subcodes:
kNoSpace = 4,
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7
which could be potential problems in the job.

kNoSpace is only one of them.
Another probable cause could be kStaleFile, some file system IO problem.
kDeadlock seems to be used only with transactions, so not relevant.
kMemoryLimit means that write batch exceeded max size, but we do not have limit 
for it as I understand.

It would be easier to debug if RocksDB JNI client would at least log the 
unknown subcode but i do not see any easy way to log it in the current version, 
without rebuilding RocksDB and subsequently Flink.

In master branch, java Status and status.h are also unsynced. You could report 
this issue in RocksDB repo, along with extending exception logging message with 
the number of unknown error code. Flink community plans to upgrade to the 
latest RocksDB version again in one of the next Flink releases.

Best,
Andrey

> On 25 Oct 2018, at 04:31, Ning Shi  wrote:
> 
> Hi,
> 
> We are doing some performance testing on a 12 node cluster with 8 task
> slots per TM. Every 15 minutes or so, the job would run into the
> following exception.
> 
> java.lang.IllegalArgumentException: Illegal value provided for SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.put(Native Method)
>   at org.rocksdb.RocksDB.put(RocksDB.java:511)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:80)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:99)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> 
> I saw an outstanding issue with similar exception in [1]. The ticket
> description suggests that it was due to out of disk error, but in our
> case, we have plenty of disk left on all TMs.
> 
> Has anyone run into this before? If so, is there a fix or workaround?
> 
> Thanks,
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9233
> 
> --
> Ning



Re: RocksDB State Backend Exception

2018-10-25 Thread Ning Shi
Hi Andrey,

Thank you for the explanation. I think you are right. It is either
kStaleFile or kNoSpace. We found the cause of the issue, even though we
still don't know how to explain it.

We set the java.io.tmpdir to an EBS-backed drive instead of the
default and the exception started happening. The issue was gone after we
changed it back to use the default.

Thanks,

On Thu, Oct 25, 2018 at 02:23:31PM +0200, Andrey Zagrebin wrote:
> Hi Ning,
>
> The problem here first of all is that RocksDB java JNI client diverged from 
> RocksDB cpp code in status.h,
> as mentioned in the Flink issue you refer to.
>
> Flink 1.6 uses RocksDB 5.7.5 java client.
> The JNI code there misses these status subcodes:
> kNoSpace = 4,
> kDeadlock = 5,
> kStaleFile = 6,
> kMemoryLimit = 7
> which could be potential problems in the job.
>
> kNoSpace is only one of them.
> Another probable cause could be kStaleFile, some file system IO problem.
> kDeadlock seems to be used only with transactions, so not relevant.
> kMemoryLimit means that write batch exceeded max size, but we do not have 
> limit for it as I understand.
>
> It would be easier to debug if RocksDB JNI client would at least log the 
> unknown subcode but i do not see any easy way to log it in the current 
> version, without rebuilding RocksDB and subsequently Flink.
>
> In master branch, java Status and status.h are also unsynced. You could 
> report this issue in RocksDB repo, along with extending exception logging 
> message with the number of unknown error code. Flink community plans to 
> upgrade to the latest RocksDB version again in one of the next Flink releases.
>
> Best,
> Andrey

--
Ning


Re: RocksDB native checkpoint time

2019-05-03 Thread Piotr Nowojski
Hi Gyula,

Have you read our tuning guide?
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
 


Synchronous part is mostly about flushing data to disks, so you could try to 
optimise your setup having that in mind. Limiting the size of a page cache, 
speeding up the writes (using more/faster disks…), etc… Maybe you can also look 
at online resources how to speedup calls to `org.rocksdb.Checkpoint#create`.

Piotrek

> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
> 
> Hi!
> 
> Does anyone know what parameters might affect the RocksDB native checkpoint 
> time? (basically the sync part of the rocksdb incremental snapshots)
> 
> It seems to take 60-70 secs in some cases for larger state sizes, and I 
> wonder if there is anything we could tune to reduce this. Maybe its only a 
> matter of size i dont know.
> 
> Any ideas would be appreciated :)
> Gyula



Re: RocksDB native checkpoint time

2019-05-03 Thread Stefan Richter
Hi,

out of curiosity, does it happen with jobs that have a large number of states 
(column groups) or also for jobs with few column groups and just “big state”?

Best,
Stefan

> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
> 
> Hi Gyula,
> 
> Have you read our tuning guide?
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>  
> 
> 
> Synchronous part is mostly about flushing data to disks, so you could try to 
> optimise your setup having that in mind. Limiting the size of a page cache, 
> speeding up the writes (using more/faster disks…), etc… Maybe you can also 
> look at online resources how to speedup calls to 
> `org.rocksdb.Checkpoint#create`.
> 
> Piotrek
> 
>> On 3 May 2019, at 10:30, Gyula Fóra > > wrote:
>> 
>> Hi!
>> 
>> Does anyone know what parameters might affect the RocksDB native checkpoint 
>> time? (basically the sync part of the rocksdb incremental snapshots)
>> 
>> It seems to take 60-70 secs in some cases for larger state sizes, and I 
>> wonder if there is anything we could tune to reduce this. Maybe its only a 
>> matter of size i dont know.
>> 
>> Any ideas would be appreciated :)
>> Gyula
> 



Re: RocksDB native checkpoint time

2019-05-03 Thread Gyula Fóra
Thanks Piotr for the tips we will play around with some settings.

@Stefan
It is a few columns but a lot of rows

Gyula

On Fri, May 3, 2019 at 11:43 AM Stefan Richter 
wrote:

> Hi,
>
> out of curiosity, does it happen with jobs that have a large number of
> states (column groups) or also for jobs with few column groups and just
> “big state”?
>
> Best,
> Stefan
>
> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
>
> Hi Gyula,
>
> Have you read our tuning guide?
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>
> Synchronous part is mostly about flushing data to disks, so you could try
> to optimise your setup having that in mind. Limiting the size of a page
> cache, speeding up the writes (using more/faster disks…), etc… Maybe you
> can also look at online resources how to speedup calls to
> `org.rocksdb.Checkpoint#create`.
>
> Piotrek
>
> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
>
> Hi!
>
> Does anyone know what parameters might affect the RocksDB native
> checkpoint time? (basically the sync part of the rocksdb incremental
> snapshots)
>
> It seems to take 60-70 secs in some cases for larger state sizes, and I
> wonder if there is anything we could tune to reduce this. Maybe its only a
> matter of size i dont know.
>
> Any ideas would be appreciated :)
> Gyula
>
>
>
>


Re: RocksDB native checkpoint time

2019-05-03 Thread Konstantin Knauf
Hi Gyula,

I looked into this a bit recently as well and did some experiments (on my
local machine). The only parameter that significantly changed anything in
this setup was reducing the total size of the write buffers (number or size
memtables). I was not able to find any online resources on the performance
of checkpoint creation in RocksDB, so looking forward to your findings...

Cheers,

Konstantin


On Fri, May 3, 2019 at 12:10 PM Gyula Fóra  wrote:

> Thanks Piotr for the tips we will play around with some settings.
>
> @Stefan
> It is a few columns but a lot of rows
>
> Gyula
>
> On Fri, May 3, 2019 at 11:43 AM Stefan Richter 
> wrote:
>
>> Hi,
>>
>> out of curiosity, does it happen with jobs that have a large number of
>> states (column groups) or also for jobs with few column groups and just
>> “big state”?
>>
>> Best,
>> Stefan
>>
>> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
>>
>> Hi Gyula,
>>
>> Have you read our tuning guide?
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>>
>> Synchronous part is mostly about flushing data to disks, so you could try
>> to optimise your setup having that in mind. Limiting the size of a page
>> cache, speeding up the writes (using more/faster disks…), etc… Maybe you
>> can also look at online resources how to speedup calls to
>> `org.rocksdb.Checkpoint#create`.
>>
>> Piotrek
>>
>> On 3 May 2019, at 10:30, Gyula Fóra  wrote:
>>
>> Hi!
>>
>> Does anyone know what parameters might affect the RocksDB native
>> checkpoint time? (basically the sync part of the rocksdb incremental
>> snapshots)
>>
>> It seems to take 60-70 secs in some cases for larger state sizes, and I
>> wonder if there is anything we could tune to reduce this. Maybe its only a
>> matter of size i dont know.
>>
>> Any ideas would be appreciated :)
>> Gyula
>>
>>
>>
>>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: -




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: RocksDB native checkpoint time

2019-05-14 Thread Gyula Fóra
Hey,

I have collected some rocksdb logs for the snapshot itself but I cant
really wrap my head around where exactly the time is spent:
https://gist.github.com/gyfora/9a37aa349f63c35cd6abe2da2cf19d5b

The general pattern where the time is spent is this:
2019/05/14-09:15:49.486455 7fbe6a8ee700 [db/db_impl_write.cc:1127]
[new-timer-state] New memtable created with log file: #111757. Immutable
memtables: 0.
2019/05/14-09:15:59.191010 7fb3cdc1d700 (Original Log Time
2019/05/14-09:15:59.191000) [db/db_impl_compaction_flush.cc:1216] Calling
FlushMemTableToOutputFile with column family [new-timer-state], flush slots
available 1, compaction slots available 1, flush slots scheduled 1,
compaction slots scheduled 0

In this example these two operations take 10 seconds, but sometimes its 40.
Based on the log wording I dont understand what exactly is going on in
between. Maybe someone with some on-hands experience with RocksDB might
have some insights.

Thanks,
Gyula


Re: RocksDB efficiency and keyby

2022-04-20 Thread Yaroslav Tkachenko
Hey Trystan,

Based on my personal experience, good disk IO for RocksDB matters a lot.
Are you using the fastest SSD storage you can get for RocskDB folders?

For example, when running on GCP, we noticed *10x* throughput improvement
by switching RocksDB storage to
https://cloud.google.com/compute/docs/disks/local-ssd

On Wed, Apr 20, 2022 at 8:50 AM Trystan  wrote:

> Hello,
>
> We have a job where its main purpose is to track whether or not we've
> previously seen a particular event - that's it. If it's new, we save it to
> an external database. If we've seen it, we block the write. There's a 3-day
> TTL to manage the state size. The downstream db can tolerate new data
> slipping through and reject the write - we mainly use the state to reduce
> writes.
>
> We're starting to see some performance issues, even after adding 50%
> capacity to the job. After some number of days/weeks, it eventually goes
> into a constant backpressure situation. I'm wondering if there's something
> we can do to improve efficiency.
>
> 1. According to the flamegraph, 60-70% of the time is spent in RocksDB.get
> 2. The state is just a ValueState. I assume this is the
> smallest/most efficient state. The keyby is extremely high cardinality -
> are we better off with a lower cardinality and a MapState
> .contains() check?
> 3. Current configs: taskmanager.memory.process.size:
> 4g, taskmanager.memory.managed.fraction: 0.8 (increased from 0.6, didn't
> see much change)
> 4. Estimated num keys tops out somewhere around 9-10B. Estimated live data
> size somewhere around 250 GB. Attempting to switch to heap state
> immediately ran into OOM (parallelism: 120, 8gb memory each).
>
> And perhaps the answer is just "scale out" :) but if there are any signals
> to know when we've reached the limit of current scale, it'd be great to
> know what signals to look for!
>
> Thanks!
> Trystan
>


Re: RocksDB efficiency and keyby

2022-04-20 Thread Trystan
Thanks for the info! We're running EBS gp2 volumes... awhile back we tested
local SSDs with a different job and didn't notice any gains, but that was
likely due to an under-optimized job where the bottleneck was elsewhere

On Wed, Apr 20, 2022, 11:08 AM Yaroslav Tkachenko 
wrote:

> Hey Trystan,
>
> Based on my personal experience, good disk IO for RocksDB matters a lot.
> Are you using the fastest SSD storage you can get for RocskDB folders?
>
> For example, when running on GCP, we noticed *10x* throughput improvement
> by switching RocksDB storage to
> https://cloud.google.com/compute/docs/disks/local-ssd
>
> On Wed, Apr 20, 2022 at 8:50 AM Trystan  wrote:
>
>> Hello,
>>
>> We have a job where its main purpose is to track whether or not we've
>> previously seen a particular event - that's it. If it's new, we save it to
>> an external database. If we've seen it, we block the write. There's a 3-day
>> TTL to manage the state size. The downstream db can tolerate new data
>> slipping through and reject the write - we mainly use the state to reduce
>> writes.
>>
>> We're starting to see some performance issues, even after adding 50%
>> capacity to the job. After some number of days/weeks, it eventually goes
>> into a constant backpressure situation. I'm wondering if there's something
>> we can do to improve efficiency.
>>
>> 1. According to the flamegraph, 60-70% of the time is spent in RocksDB.get
>> 2. The state is just a ValueState. I assume this is the
>> smallest/most efficient state. The keyby is extremely high cardinality -
>> are we better off with a lower cardinality and a MapState
>> .contains() check?
>> 3. Current configs: taskmanager.memory.process.size:
>> 4g, taskmanager.memory.managed.fraction: 0.8 (increased from 0.6, didn't
>> see much change)
>> 4. Estimated num keys tops out somewhere around 9-10B. Estimated live
>> data size somewhere around 250 GB. Attempting to switch to heap state
>> immediately ran into OOM (parallelism: 120, 8gb memory each).
>>
>> And perhaps the answer is just "scale out" :) but if there are any
>> signals to know when we've reached the limit of current scale, it'd be
>> great to know what signals to look for!
>>
>> Thanks!
>> Trystan
>>
>


Re: RocksDB efficiency and keyby

2022-04-20 Thread Yaroslav Tkachenko
Yep, I'd give it another try. EBS could be too slow in some use-cases.

On Wed, Apr 20, 2022 at 9:39 AM Trystan  wrote:

> Thanks for the info! We're running EBS gp2 volumes... awhile back we
> tested local SSDs with a different job and didn't notice any gains, but
> that was likely due to an under-optimized job where the bottleneck was
> elsewhere
>
> On Wed, Apr 20, 2022, 11:08 AM Yaroslav Tkachenko 
> wrote:
>
>> Hey Trystan,
>>
>> Based on my personal experience, good disk IO for RocksDB matters a lot.
>> Are you using the fastest SSD storage you can get for RocskDB folders?
>>
>> For example, when running on GCP, we noticed *10x* throughput
>> improvement by switching RocksDB storage to
>> https://cloud.google.com/compute/docs/disks/local-ssd
>>
>> On Wed, Apr 20, 2022 at 8:50 AM Trystan  wrote:
>>
>>> Hello,
>>>
>>> We have a job where its main purpose is to track whether or not we've
>>> previously seen a particular event - that's it. If it's new, we save it to
>>> an external database. If we've seen it, we block the write. There's a 3-day
>>> TTL to manage the state size. The downstream db can tolerate new data
>>> slipping through and reject the write - we mainly use the state to reduce
>>> writes.
>>>
>>> We're starting to see some performance issues, even after adding 50%
>>> capacity to the job. After some number of days/weeks, it eventually goes
>>> into a constant backpressure situation. I'm wondering if there's something
>>> we can do to improve efficiency.
>>>
>>> 1. According to the flamegraph, 60-70% of the time is spent in
>>> RocksDB.get
>>> 2. The state is just a ValueState. I assume this is the
>>> smallest/most efficient state. The keyby is extremely high cardinality -
>>> are we better off with a lower cardinality and a MapState
>>> .contains() check?
>>> 3. Current configs: taskmanager.memory.process.size:
>>> 4g, taskmanager.memory.managed.fraction: 0.8 (increased from 0.6, didn't
>>> see much change)
>>> 4. Estimated num keys tops out somewhere around 9-10B. Estimated live
>>> data size somewhere around 250 GB. Attempting to switch to heap state
>>> immediately ran into OOM (parallelism: 120, 8gb memory each).
>>>
>>> And perhaps the answer is just "scale out" :) but if there are any
>>> signals to know when we've reached the limit of current scale, it'd be
>>> great to know what signals to look for!
>>>
>>> Thanks!
>>> Trystan
>>>
>>


Re: RocksDB efficiency and keyby

2022-04-21 Thread Yun Tang
Hi Trystan,

You can use async-profiler[1] to detect the CPU stack within RocksDB to see 
what happened, maybe you can try to enable partitioned index & filters[2] if 
the call stack is occupied by loading index or filter block.

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters

Best
Yun Tang



From: Yaroslav Tkachenko 
Sent: Thursday, April 21, 2022 0:44
To: Trystan 
Cc: user 
Subject: Re: RocksDB efficiency and keyby

Yep, I'd give it another try. EBS could be too slow in some use-cases.

On Wed, Apr 20, 2022 at 9:39 AM Trystan 
mailto:entro...@gmail.com>> wrote:
Thanks for the info! We're running EBS gp2 volumes... awhile back we tested 
local SSDs with a different job and didn't notice any gains, but that was 
likely due to an under-optimized job where the bottleneck was elsewhere

On Wed, Apr 20, 2022, 11:08 AM Yaroslav Tkachenko 
mailto:yaros...@goldsky.io>> wrote:
Hey Trystan,

Based on my personal experience, good disk IO for RocksDB matters a lot. Are 
you using the fastest SSD storage you can get for RocskDB folders?

For example, when running on GCP, we noticed 10x throughput improvement by 
switching RocksDB storage to 
https://cloud.google.com/compute/docs/disks/local-ssd

On Wed, Apr 20, 2022 at 8:50 AM Trystan 
mailto:entro...@gmail.com>> wrote:
Hello,

We have a job where its main purpose is to track whether or not we've 
previously seen a particular event - that's it. If it's new, we save it to an 
external database. If we've seen it, we block the write. There's a 3-day TTL to 
manage the state size. The downstream db can tolerate new data slipping through 
and reject the write - we mainly use the state to reduce writes.

We're starting to see some performance issues, even after adding 50% capacity 
to the job. After some number of days/weeks, it eventually goes into a constant 
backpressure situation. I'm wondering if there's something we can do to improve 
efficiency.

1. According to the flamegraph, 60-70% of the time is spent in RocksDB.get
2. The state is just a ValueState. I assume this is the smallest/most 
efficient state. The keyby is extremely high cardinality - are we better off 
with a lower cardinality and a MapState .contains() check?
3. Current configs: taskmanager.memory.process.size: 4g, 
taskmanager.memory.managed.fraction: 0.8 (increased from 0.6, didn't see much 
change)
4. Estimated num keys tops out somewhere around 9-10B. Estimated live data size 
somewhere around 250 GB. Attempting to switch to heap state immediately ran 
into OOM (parallelism: 120, 8gb memory each).

And perhaps the answer is just "scale out" :) but if there are any signals to 
know when we've reached the limit of current scale, it'd be great to know what 
signals to look for!

Thanks!
Trystan


Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Aljoscha Krettek
Hi,
you are right. Currently there is no incremental checkpointing and
therefore, at each checkpoint, we essentially copy the whole RocksDB
database to HDFS (or whatever filesystem you chose as a backup location).
As far as I know, Stephan will start working on adding support for
incremental snapshots this week or next week.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 09:55 Krzysztof Zarzycki  wrote:

> Hi,
> I saw the documentation and source code of the state management with
> RocksDB and before I use it, I'm concerned of one thing: Am I right that
> currently when state is being checkpointed, the whole RocksDB state is
> snapshotted? There is no incremental, diff snapshotting, is it? If so, this
> seems to be unfeasible for keeping state counted in tens or hundreds of GBs
> (and you reach that size of a state, when you want to keep an embedded
> state of the streaming application instead of going out to Cassandra/Hbase
> or other DB). It will just cost too much to do snapshots of such large
> state.
>
> Samza as a good example to compare, writes every state change to Kafka
> topic, considering it a snapshot in the shape of changelog. Of course in
> the moment of app restart, recovering the state from the changelog would be
> too costly, that is why the changelog topic is compacted. Plus, I think
> Samza does a state snapshot from time to time anyway (but I'm not sure of
> that).
>
> Thanks for answering my doubts,
> Krzysztof
>
>


Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
OK, Thanks Aljoscha for the info!
Guys, great work on Flink, I really love it :)

Cheers,
Krzysztof

czw., 7.04.2016 o 10:48 użytkownik Aljoscha Krettek 
napisał:

> Hi,
> you are right. Currently there is no incremental checkpointing and
> therefore, at each checkpoint, we essentially copy the whole RocksDB
> database to HDFS (or whatever filesystem you chose as a backup location).
> As far as I know, Stephan will start working on adding support for
> incremental snapshots this week or next week.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 09:55 Krzysztof Zarzycki 
> wrote:
>
>> Hi,
>> I saw the documentation and source code of the state management with
>> RocksDB and before I use it, I'm concerned of one thing: Am I right that
>> currently when state is being checkpointed, the whole RocksDB state is
>> snapshotted? There is no incremental, diff snapshotting, is it? If so, this
>> seems to be unfeasible for keeping state counted in tens or hundreds of GBs
>> (and you reach that size of a state, when you want to keep an embedded
>> state of the streaming application instead of going out to Cassandra/Hbase
>> or other DB). It will just cost too much to do snapshots of such large
>> state.
>>
>> Samza as a good example to compare, writes every state change to Kafka
>> topic, considering it a snapshot in the shape of changelog. Of course in
>> the moment of app restart, recovering the state from the changelog would be
>> too costly, that is why the changelog topic is compacted. Plus, I think
>> Samza does a state snapshot from time to time anyway (but I'm not sure of
>> that).
>>
>> Thanks for answering my doubts,
>> Krzysztof
>>
>>


Re: RocksDB error with flink 1.2.0

2017-04-28 Thread Aljoscha Krettek
The problem here is that this will try to open 300 RocksDB instances on each of 
the TMs (depending on how the parallelism is spread between the machines this 
could be more or less). As the exception says, this will open too many files 
because each RocksDB instance has a directory with several files in it.

One possible solution would be to increase the limit on open files but I don’t 
think that opening 300 RocksDB instances on one machine is a good idea for any 
size of machine. I think with this many patterns you could start thinking about 
writing the pattern matching yourself and multiplexing the several patterns in 
one stateful function or operator.

@Stefan, what do you think about having this many Rocks instances?

Best,
Aljoscha

> On 28. Apr 2017, at 17:05, mclendenin  wrote:
> 
> Starting ~300 CEP patterns with parallelism of 6 since there are 6 partitions
> on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50
> seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
> 
> 
> java.io.IOException: Error creating ColumnFamilyHandle.
>at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
>at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
>at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
>at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
>at
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
>at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
>at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
>at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
>at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: org.rocksdb.RocksDBException: IO error:
> /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-06:
> Too many open files
>at org.rocksdb.RocksDB.createColumnFamily(Native Method)
>at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
>at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
>... 12 more
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: RocksDB error with flink 1.2.0

2017-04-28 Thread Marcus Clendenin
I changed the max number of open files and got past this error but now I'm
seeing errors that it's unable to flush the file. I am checkpointing using
hdfs, should I be using local file system?

Is there any better way to use the cep patterns with multiple patterns or
are you suggesting creating my own pattern matching?

On Fri, Apr 28, 2017, 11:44 AM Aljoscha Krettek  wrote:

> The problem here is that this will try to open 300 RocksDB instances on
> each of the TMs (depending on how the parallelism is spread between the
> machines this could be more or less). As the exception says, this will open
> too many files because each RocksDB instance has a directory with several
> files in it.
>
> One possible solution would be to increase the limit on open files but I
> don’t think that opening 300 RocksDB instances on one machine is a good
> idea for any size of machine. I think with this many patterns you could
> start thinking about writing the pattern matching yourself and multiplexing
> the several patterns in one stateful function or operator.
>
> @Stefan, what do you think about having this many Rocks instances?
>
> Best,
> Aljoscha
>
> > On 28. Apr 2017, at 17:05, mclendenin  wrote:
> >
> > Starting ~300 CEP patterns with parallelism of 6 since there are 6
> partitions
> > on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50
> > seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
> >
> >
> > java.io.IOException: Error creating ColumnFamilyHandle.
> >at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
> >at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
> >at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
> >at
> >
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
> >at
> >
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
> >at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
> >at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
> >at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
> >at
> >
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
> >at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
> >at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> >at java.lang.Thread.run(Thread.java:745)
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-06:
> > Too many open files
> >at org.rocksdb.RocksDB.createColumnFamily(Native Method)
> >at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
> >at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
> >... 12 more
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: RocksDB error with flink 1.2.0

2017-04-28 Thread mclendenin
This is the stacktrace I'm getting when checkpointing to the HDFS. It happens
like once every 3 checkpoints and I don't see this without parallelism. 

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
6 for operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed
(4/6).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
operator KeyedCEPPatternOperator -> Flat Map -> Map -> Sink: Unnamed (4/6).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed keyed
state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1010)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:974)
... 5 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:79)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 more
Caused by: java.io.IOException: Could not flush and close the file 
system
output stream to
hdfs:///user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048
in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:580)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:410)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:298)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:277)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:37)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
... 5 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): 
File
/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-6/6312cc6f-a60f-4458-8d0b-0455d69d3048
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 3 datanode(s) running and no node(s) are excluded in this operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem

Re: RocksDB error with flink 1.2.0

2017-04-28 Thread mclendenin
The top level exception is similar to one on this Jira issue but the root
Exception is different. This one says it was fixed in 1.2.0 which is what
I'm using

https://issues.apache.org/jira/browse/FLINK-5663 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897p12908.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: RocksDB error with flink 1.2.0

2017-04-28 Thread mclendenin
There are only 3 nodes in the HDFS cluster and when running fsck it shows the
filesystem as healthy.

$ hdfs fsck
/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/
17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Connecting to namenode via
http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43
FSCK started by hadoop (auth:SIMPLE) from / for path
/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at
Fri Apr 28 16:25:00 EDT 2017
.Status: HEALTHY
 Total size:33197 B
 Total dirs:1
 Total files:   5
 Total symlinks:0 (Files currently being written: 460)
 Total blocks (validated):  5 (avg. block size 6639 B)
 Minimally replicated blocks:   5 (100.0 %)
 Over-replicated blocks:0 (0.0 %)
 Under-replicated blocks:   0 (0.0 %)
 Mis-replicated blocks: 0 (0.0 %)
 Default replication factor:2
 Average block replication: 3.0
 Corrupt blocks:0
 Missing replicas:  0 (0.0 %)
 Number of data-nodes:  3
 Number of racks:   1
FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds


The filesystem under path
'/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is
HEALTHY



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897p12909.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: RocksDB MapState debugging key serialization

2021-06-30 Thread Yuval Itzchakov
Here is what the documentation on RocksDBStateBackend

says:

The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB database
that is (per default) stored in the TaskManager local data directories.
Unlike storing java objects in HashMapStateBackend, data is stored as
serialized byte arrays, which are mainly defined by the type
serializer, *resulting
in key comparisons being byte-wise instead of using Java’s hashCode() and
equals() methods.*

This means that if your keys are not byte-wise equivalent, they won't be
matched.

On Wed, Jun 30, 2021 at 7:37 PM Thomas Breloff  wrote:

> Hello,
>
> I am having trouble with a Flink job which is configured using a RocksDB
> state backend.
>
>
>
> Tl;dr: How can I debug the key serialization for RocksDB MapState for a
> deployed Flink job?
>
>
>
> Details:
>
>
>
> When I “put” a key/value pair into a MapState, and then later try to “get”
> using a key which has the same hashCode/equals as what I put in, I get back
> “null”.
>
>
>
> Some things I have verified:
>
>
>
>- Looping over the “keys()” or “entries()” of the MapState contains
>the expected key (which matches both hashCode and equals)
>- If I “put” the same key that I’m attempting to “get” with, and then
>look at the “entries”, then both of the keys appear in the map.
>
>
>
> I think I understand that the RocksDB version of MapState will use the
> serialized keys, however I have tested what I think is the serializer and
> it returns the same serialization for both objects.
>
>
>
> How can I find out the serialized values that are being used for key
> comparison? Can you recommend any possible solutions or debugging
> strategies that would help?
>
>
>
> Thank you,
>
> Tom
>


-- 
Best Regards,
Yuval Itzchakov.


Re: RocksDB MapState debugging key serialization

2021-06-30 Thread Thomas Breloff
Thanks Yuval.  Indeed it was a serialization issue.  I followed the 
instructions in the docs to set up a local test environment with RocksDB that I 
was able to set a breakpoint in and step through.

I discovered that my key was not properly registered with the Kryo serializer 
and the default FieldSerializer was not producing byte-wise equal 
serializations.

Thanks for the prompt response!
Tom

From: Yuval Itzchakov 
Date: Wednesday, June 30, 2021 at 12:56 PM
To: Thomas Breloff 
Cc: user@flink.apache.org 
Subject: Re: RocksDB MapState debugging key serialization
Here is what the documentation on 
RocksDBStateBackend<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fops%2Fstate%2Fstate_backends%2F%23the-embeddedrocksdbstatebackend&data=04%7C01%7Ctomb%40ec.ai%7C0ff24379154e42531d9c08d93be7bc7e%7Cf48a62c73e034851846d8f83284a7646%7C0%7C0%7C637606689664218364%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=%2FD%2FJU17pir2c07dFjqNUgc%2BSHYl9t2ccAqXS9CFwOGA%3D&reserved=0>
 says:

The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB database that 
is (per default) stored in the TaskManager local data directories.
Unlike storing java objects in HashMapStateBackend, data is stored as 
serialized byte arrays, which are mainly defined by the type serializer, 
resulting in key comparisons being byte-wise instead of using Java’s hashCode() 
and equals() methods.

This means that if your keys are not byte-wise equivalent, they won't be 
matched.

On Wed, Jun 30, 2021 at 7:37 PM Thomas Breloff mailto:t...@ec.ai>> 
wrote:
Hello,
I am having trouble with a Flink job which is configured using a RocksDB state 
backend.

Tl;dr: How can I debug the key serialization for RocksDB MapState for a 
deployed Flink job?

Details:

When I “put” a key/value pair into a MapState, and then later try to “get” 
using a key which has the same hashCode/equals as what I put in, I get back 
“null”.

Some things I have verified:


  *   Looping over the “keys()” or “entries()” of the MapState contains the 
expected key (which matches both hashCode and equals)
  *   If I “put” the same key that I’m attempting to “get” with, and then look 
at the “entries”, then both of the keys appear in the map.

I think I understand that the RocksDB version of MapState will use the 
serialized keys, however I have tested what I think is the serializer and it 
returns the same serialization for both objects.

How can I find out the serialized values that are being used for key 
comparison? Can you recommend any possible solutions or debugging strategies 
that would help?

Thank you,
Tom


--
Best Regards,
Yuval Itzchakov.


Re: RocksDB savepoint recovery performance improvements

2020-05-18 Thread Yun Tang
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I 
planed to use RocksDB's benchmark to mock scenario in Flink. However, I found 
the main challenge is how to ensure the keys are inserted in a strictly 
increasing order. The key order in java could differ from the bytes order in 
RocksDB. In your case, I think it could be much easier as 
RocksFullSnapshotStrategy write data per columnfamily per key group which 
should be in a strictly increasing order [1].

FLINK-17288 could mitigate 
the performance and your solution could help improve the performance much 
better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how 
about using checkpoint to recover, as it also supports rescale and normal 
recover.

[1] 
https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308


Best
Yun Tang

From: Joey Pereira 
Sent: Tuesday, May 19, 2020 2:27
To: user@flink.apache.org 
Cc: Mike Mintz ; Shahid Chohan ; Aaron 
Levin 
Subject: RocksDB savepoint recovery performance improvements

Hey,

While running a Flink application with a large-state, savepoint recovery has 
been a painful part of operating the application because recovery time can be 
several hours. During some profiling that chohan (cc'd) had done, a red flag 
stood out — savepoint recovery consisted mostly of RocksDB Get and Put 
operations.

When Flink is bootstrapping state for RocksDB instances this is not what I 
would have expected, as RocksDB supports direct ingestion of the on-disk format 
(SSTables): 
https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This 
was also recently reported on Jira: 
https://issues.apache.org/jira/browse/FLINK-17288.

>From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and 
RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

* RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will 
provide atomicity of batches as well as performance benefits for batching, 
compared to individual Puts, but it will still involve RocksDB’s insert paths 
which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, 
writes can be batched even further and avoid expensive operations in RocksDB. 
This is commonly utilized by other systems for restoration or import processes, 
such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some 
restrictions on being able to generate SSTables, as well as limitations for 
ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing 
or other ingested files[4]. It is possible to ingest overlapping SSTables, but 
this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with 
keys in-order, it would mean that the savepoints would need to be ordered while 
processing them. I'm unsure if this is the case for how Flink's savepoints are 
stored.

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used 
(eg: for incremental checkpoint or something else). I did notice it is 
iterating over a temporary RocksDB instance and inserting into a "final” 
instance. These writes could be optimized in a similar manner. Alternatively, 
it could be possible to use the temporary instance's SSTables, ingest them, and 
prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of 
making an interface for RocksDBWriteBatchWrapper and swapping the 
implementation for one that does SSTable generation and ingestion. I reckon 
that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the 
difference for ingestion scenarios, as RocksDB itself is sparse on details 
about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that 
implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the 
operations happening during writes include writing to the memtable before 
flushing to an SSTable and doing merging and/or compaction. In general, these 
will add write-amplification and overall overhead to bulk insertion. These can 
largely be avoided by giving RocksDB SSTables, especially if they have 
non-overlapping key-ranges.  "Characterizing, Modeling,

Re: RocksDB savepoint recovery performance improvements

2020-05-18 Thread Joey Pereira
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with
that in mind.

We have already begun using checkpoints for recovery. Having these
improvements would still be immensely helpful to reduce downtime for
savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:

> Hi Joey
>
> Previously, I also looked at the mechanism to create on-disk SSTables as I
> planed to use RocksDB's benchmark to mock scenario in Flink. However, I
> found the main challenge is how to ensure the keys are inserted in a
> strictly increasing order. The key order in java could differ from the
> bytes order in RocksDB. In your case, I think it could be much easier as
> RocksFullSnapshotStrategy write data per columnfamily per key group which
> should be in a strictly increasing order [1].
>
> FLINK-17288  could
> mitigate the performance and your solution could help improve the
> performance much better (and could integrate with state-processor-api
> story).
>
> On the other hand, for out-of-box to use in production for your scenario,
> how about using checkpoint to recover, as it also supports rescale and
> normal recover.
>
> [1]
> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>
>
> Best
> Yun Tang
> --
> *From:* Joey Pereira 
> *Sent:* Tuesday, May 19, 2020 2:27
> *To:* user@flink.apache.org 
> *Cc:* Mike Mintz ; Shahid Chohan ;
> Aaron Levin 
> *Subject:* RocksDB savepoint recovery performance improvements
>
> Hey,
>
> While running a Flink application with a large-state, savepoint recovery
> has been a painful part of operating the application because recovery time
> can be several hours. During some profiling that chohan (cc'd) had done, a
> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
> Put operations.
>
> When Flink is bootstrapping state for RocksDB instances this is not what I
> would have expected, as RocksDB supports direct ingestion of the on-disk
> format (SSTables):
> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
> This
> was also recently reported on Jira:
> https://issues.apache.org/jira/browse/FLINK-17288.
>
> From what I understood of the current implementation:
>
> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
> RocksDBIncrementalRestoreOperation,
> use RocksDBWriteBatchWrapper.
>
> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
> will provide atomicity of batches as well as performance benefits for
> batching, compared to individual Puts, but it will still involve RocksDB’s
> insert paths which can involve expensive operations[0].
>
> Instead, by creating SSTable files and instructing RocksDB to ingest the
> files, writes can be batched even further and avoid expensive operations in
> RocksDB. This is commonly utilized by other systems for restoration or
> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
> are some restrictions on being able to generate SSTables, as well as
> limitations for ingestion to be performant. Unfortunately, it’s all not
> very well documented:
>
> 1. When generating an SSTable, keys need to be inserted in-order.
>
> 2. Ingested files should not have key-ranges that overlap with either
> existing or other ingested files[4]. It is possible to ingest overlapping
> SSTables, but this may incur significant overhead.
>
> To generate SSTables with non-overlapping key-ranges and to create them
> with keys in-order, it would mean that the savepoints would need to be
> ordered while processing them. I'm unsure if this is the case for how
> Flink's savepoints are stored.
>
> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is
> used (eg: for incremental checkpoint or something else). I did notice it
> is iterating over a temporary RocksDB instance and inserting into a "final
> ” instance. These writes could be optimized in a similar manner.
> Alternatively, it could be possible to use the temporary instance's
> SSTables, ingest them, and prune data out with RocksDB's DeleteRange.
>
> To get started with prototyping, I was thinking of taking a simple
> approach of making an interface for RocksDBWriteBatchWrapper and swapping
> the implementation for one that does SSTable generation and ingestion. I
> reckon that will be an easy way to sanity check whether it works at all.
>
> I was also planning on writing some benchmarks in RocksDB to understand
> the difference for ingestion scenarios, as RocksDB itself is sparse on
> details about SSTable ingestion[4] and does not have benchmarking for
> ingestion.
>
> Does all of that seem sound? I'll report back when I get time to work out
> that implementation and tests, likely during the coming weekend.
>
>
> Joey
>

Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Joey Pereira
Following up: I've put together the implementation,
https://github.com/apache/flink/pull/12345. It's passing tests but is
only partially complete, as it still needs some clean-up and configuration.
I still need to try running this against a production cluster to check the
performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira  wrote:

> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
> with that in mind.
>
> We have already begun using checkpoints for recovery. Having these
> improvements would still be immensely helpful to reduce downtime for
> savepoint recovery.
>
> On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:
>
>> Hi Joey
>>
>> Previously, I also looked at the mechanism to create on-disk SSTables as
>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>> found the main challenge is how to ensure the keys are inserted in a
>> strictly increasing order. The key order in java could differ from the
>> bytes order in RocksDB. In your case, I think it could be much easier as
>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>> should be in a strictly increasing order [1].
>>
>> FLINK-17288  could
>> mitigate the performance and your solution could help improve the
>> performance much better (and could integrate with state-processor-api
>> story).
>>
>> On the other hand, for out-of-box to use in production for your scenario,
>> how about using checkpoint to recover, as it also supports rescale and
>> normal recover.
>>
>> [1]
>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>
>>
>> Best
>> Yun Tang
>> --
>> *From:* Joey Pereira 
>> *Sent:* Tuesday, May 19, 2020 2:27
>> *To:* user@flink.apache.org 
>> *Cc:* Mike Mintz ; Shahid Chohan ;
>> Aaron Levin 
>> *Subject:* RocksDB savepoint recovery performance improvements
>>
>> Hey,
>>
>> While running a Flink application with a large-state, savepoint recovery
>> has been a painful part of operating the application because recovery time
>> can be several hours. During some profiling that chohan (cc'd) had done, a
>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>> Put operations.
>>
>> When Flink is bootstrapping state for RocksDB instances this is not what
>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>> format (SSTables):
>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>> This
>> was also recently reported on Jira:
>> https://issues.apache.org/jira/browse/FLINK-17288.
>>
>> From what I understood of the current implementation:
>>
>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>> RocksDBIncrementalRestoreOperation,
>> use RocksDBWriteBatchWrapper.
>>
>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>> will provide atomicity of batches as well as performance benefits for
>> batching, compared to individual Puts, but it will still involve RocksDB’s
>> insert paths which can involve expensive operations[0].
>>
>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>> files, writes can be batched even further and avoid expensive operations in
>> RocksDB. This is commonly utilized by other systems for restoration or
>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>> are some restrictions on being able to generate SSTables, as well as
>> limitations for ingestion to be performant. Unfortunately, it’s all not
>> very well documented:
>>
>> 1. When generating an SSTable, keys need to be inserted in-order.
>>
>> 2. Ingested files should not have key-ranges that overlap with either
>> existing or other ingested files[4]. It is possible to ingest overlapping
>> SSTables, but this may incur significant overhead.
>>
>> To generate SSTables with non-overlapping key-ranges and to create them
>> with keys in-order, it would mean that the savepoints would need to be
>> ordered while processing them. I'm unsure if this is the case for how
>> Flink's savepoints are stored.
>>
>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is
>> used (eg: for incremental checkpoint or something else). I did notice it
>> is iterating over a temporary RocksDB instance and inserting into a "final
>> ” instance. These writes could be optimized in a similar manner.
>> Alternatively, it could be possible to use the temporary instance's
>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange.
>>
>> To get started with prototyping, I was thinking of taking a simple
>> approach of making an interface for RocksDBWriteBatchWrapper and swapping
>> the implementation for one that does SSTable genera

Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Steven Wu
Yun, you mentioned that checkpoint also supports rescale. I thought the
recommendation [1] is to use savepoint for rescale.

[1]
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira  wrote:

> Following up: I've put together the implementation,
> https://github.com/apache/flink/pull/12345. It's passing tests but is
> only partially complete, as it still needs some clean-up and configuration.
> I still need to try running this against a production cluster to check the
> performance, as well as getting some RocksDB benchmarks.
>
> On Mon, May 18, 2020 at 3:46 PM Joey Pereira  wrote:
>
>> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
>> with that in mind.
>>
>> We have already begun using checkpoints for recovery. Having these
>> improvements would still be immensely helpful to reduce downtime for
>> savepoint recovery.
>>
>> On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:
>>
>>> Hi Joey
>>>
>>> Previously, I also looked at the mechanism to create on-disk SSTables as
>>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>>> found the main challenge is how to ensure the keys are inserted in a
>>> strictly increasing order. The key order in java could differ from the
>>> bytes order in RocksDB. In your case, I think it could be much easier as
>>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>>> should be in a strictly increasing order [1].
>>>
>>> FLINK-17288  could
>>> mitigate the performance and your solution could help improve the
>>> performance much better (and could integrate with state-processor-api
>>> story).
>>>
>>> On the other hand, for out-of-box to use in production for your
>>> scenario, how about using checkpoint to recover, as it also supports
>>> rescale and normal recover.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>>
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Joey Pereira 
>>> *Sent:* Tuesday, May 19, 2020 2:27
>>> *To:* user@flink.apache.org 
>>> *Cc:* Mike Mintz ; Shahid Chohan <
>>> cho...@stripe.com>; Aaron Levin 
>>> *Subject:* RocksDB savepoint recovery performance improvements
>>>
>>> Hey,
>>>
>>> While running a Flink application with a large-state, savepoint recovery
>>> has been a painful part of operating the application because recovery time
>>> can be several hours. During some profiling that chohan (cc'd) had done, a
>>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>>> Put operations.
>>>
>>> When Flink is bootstrapping state for RocksDB instances this is not what
>>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>>> format (SSTables):
>>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>>> This
>>> was also recently reported on Jira:
>>> https://issues.apache.org/jira/browse/FLINK-17288.
>>>
>>> From what I understood of the current implementation:
>>>
>>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>>> RocksDBIncrementalRestoreOperation,
>>> use RocksDBWriteBatchWrapper.
>>>
>>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>>> will provide atomicity of batches as well as performance benefits for
>>> batching, compared to individual Puts, but it will still involve RocksDB’s
>>> insert paths which can involve expensive operations[0].
>>>
>>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>>> files, writes can be batched even further and avoid expensive operations in
>>> RocksDB. This is commonly utilized by other systems for restoration or
>>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>>> are some restrictions on being able to generate SSTables, as well as
>>> limitations for ingestion to be performant. Unfortunately, it’s all not
>>> very well documented:
>>>
>>> 1. When generating an SSTable, keys need to be inserted in-order.
>>>
>>> 2. Ingested files should not have key-ranges that overlap with either
>>> existing or other ingested files[4]. It is possible to ingest overlapping
>>> SSTables, but this may incur significant overhead.
>>>
>>> To generate SSTables with non-overlapping key-ranges and to create them
>>> with keys in-order, it would mean that the savepoints would need to be
>>> ordered while processing them. I'm unsure if this is the case for how
>>> Flink's savepoints are stored.
>>>
>>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it
>>> is used (eg: for incremental checkpoint or something else). I did
>>> notice it is iterating over a temporary RocksDB instance and inserting into
>>> a "final” instance. These

Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Yun Tang
@Joey Pereira<mailto:j...@stripe.com> I think you might need to create a new 
JIRA ticket and link your PR to the new issue as 
FLINK-17288<https://issues.apache.org/jira/browse/FLINK-17288> mainly focus on 
bulk load options while your solution focus on SST generator, if your solution 
could behave better, we could tag 
FLINK-17288<https://issues.apache.org/jira/browse/FLINK-17288> as "won't do".

@Steven Wu<mailto:stevenz...@gmail.com> sure, Flink community always suggest to 
use savepoint to restore but current checkpoint also support it. I mention that 
is for quick fix at his scenario.

Best
Yun Tang

From: Steven Wu 
Sent: Wednesday, May 27, 2020 0:36
To: Joey Pereira 
Cc: user@flink.apache.org ; Yun Tang ; 
Mike Mintz ; Shahid Chohan ; Aaron 
Levin 
Subject: Re: RocksDB savepoint recovery performance improvements

Yun, you mentioned that checkpoint also supports rescale. I thought the 
recommendation [1] is to use savepoint for rescale.

[1] 
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Following up: I've put together the implementation, 
https://github.com/apache/flink/pull/12345. It's passing tests but is only 
partially complete, as it still needs some clean-up and configuration. I still 
need to try running this against a production cluster to check the performance, 
as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with 
that in mind.

We have already begun using checkpoints for recovery. Having these improvements 
would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I 
planed to use RocksDB's benchmark to mock scenario in Flink. However, I found 
the main challenge is how to ensure the keys are inserted in a strictly 
increasing order. The key order in java could differ from the bytes order in 
RocksDB. In your case, I think it could be much easier as 
RocksFullSnapshotStrategy write data per columnfamily per key group which 
should be in a strictly increasing order [1].

FLINK-17288<https://issues.apache.org/jira/browse/FLINK-17288> could mitigate 
the performance and your solution could help improve the performance much 
better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how 
about using checkpoint to recover, as it also supports rescale and normal 
recover.

[1] 
https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308


Best
Yun Tang

From: Joey Pereira mailto:j...@stripe.com>>
Sent: Tuesday, May 19, 2020 2:27
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Cc: Mike Mintz mailto:mikemi...@stripe.com>>; Shahid 
Chohan mailto:cho...@stripe.com>>; Aaron Levin 
mailto:aaronle...@stripe.com>>
Subject: RocksDB savepoint recovery performance improvements

Hey,

While running a Flink application with a large-state, savepoint recovery has 
been a painful part of operating the application because recovery time can be 
several hours. During some profiling that chohan (cc'd) had done, a red flag 
stood out — savepoint recovery consisted mostly of RocksDB Get and Put 
operations.

When Flink is bootstrapping state for RocksDB instances this is not what I 
would have expected, as RocksDB supports direct ingestion of the on-disk format 
(SSTables): 
https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This 
was also recently reported on Jira: 
https://issues.apache.org/jira/browse/FLINK-17288.

>From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and 
RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

* RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will 
provide atomicity of batches as well as performance benefits for batching, 
compared to individual Puts, but it will still involve RocksDB’s insert paths 
which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, 
writes can be batched even further and avoid expensive operations in RocksDB. 
This is commonly utilized by other systems for restoration or import processes, 
such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some 

Re: Rocksdb - Incremental vs full checkpoints

2020-10-13 Thread Yun Tang
Hi

This difference of data size of incremental vs full checkpoint is due to the 
different implementations.
The incremental checkpoint strategy upload binary sst files while full 
checkpoint strategy scans the DB and write all kv entries to external DFS.

As your state size is really small (only 200 KB), I think your RocksDB has not 
ever triggered compaction to reduce sst files, that's why the size constantly 
increase.

Best
Yun Tang

From: sudranga 
Sent: Wednesday, October 14, 2020 10:40
To: user@flink.apache.org 
Subject: Rocksdb - Incremental vs full checkpoints

Hi,
I have an event-window pipeline which handles a fixed number of messages per
second for a fixed number of keys. When i have rocksdb as the state backend
with incremental checkpoints, i see the delta checkpoint size constantly
increase. Please see


I turned off incremental checkpoints and all the checkpoints are 64kb (There
appears to be no state leak in user code or otherwise). It is not clear why
the incremental checkpoints keep increasing in size. Perhaps, the
incremental checkpoints are not incremental(for this small state size) and
are simply full state appended to full state and so on...

>From some posts on this forum, I understand the use case for incremental
checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
where the changes in state are minimal across checkpoints. However, does
this mean that we should not enable incremental checkpointing for use cases
where the state size is much smaller? Would the 'constantly' increasing
snapshot delta size reduce at some point?  I don't see any compaction runs
happening
(taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
Not sure if that is what I am missing...

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rocksdb - Incremental vs full checkpoints

2020-10-25 Thread Sudharsan R
Hi Yun,
Sorry for the late reply - I was doing some reading. As far as i
understand, when incremental checkpointing is enabled, the reported
checkpoint size(metrics/UI) is only the size of the deltas and not the full
state size. I understand that compaction may not get triggered. But, if we
are creating a fixed amount of state every checkpoint interval,
shouldn't the reported checkpoint size remain the same(as it is a delta)?


Thanks

Sudharsan

On Tue, Oct 13, 2020 at 11:34 PM Yun Tang  wrote:

> Hi
>
> This difference of data size of incremental vs full checkpoint is due to
> the different implementations.
> The incremental checkpoint strategy upload binary sst files while full
> checkpoint strategy scans the DB and write all kv entries to external DFS.
>
> As your state size is really small (only 200 KB), I think your RocksDB has
> not ever triggered compaction to reduce sst files, that's why the size
> constantly increase.
>
> Best
> Yun Tang
> --
> *From:* sudranga 
> *Sent:* Wednesday, October 14, 2020 10:40
> *To:* user@flink.apache.org 
> *Subject:* Rocksdb - Incremental vs full checkpoints
>
> Hi,
> I have an event-window pipeline which handles a fixed number of messages
> per
> second for a fixed number of keys. When i have rocksdb as the state backend
> with incremental checkpoints, i see the delta checkpoint size constantly
> increase. Please see
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2790/Screen_Shot_2020-10-13_at_6.png>
>
>
> I turned off incremental checkpoints and all the checkpoints are 64kb
> (There
> appears to be no state leak in user code or otherwise). It is not clear why
> the incremental checkpoints keep increasing in size. Perhaps, the
> incremental checkpoints are not incremental(for this small state size) and
> are simply full state appended to full state and so on...
>
> From some posts on this forum, I understand the use case for incremental
> checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
> where the changes in state are minimal across checkpoints. However, does
> this mean that we should not enable incremental checkpointing for use cases
> where the state size is much smaller? Would the 'constantly' increasing
> snapshot delta size reduce at some point?  I don't see any compaction runs
> happening
>
> (taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
> Not sure if that is what I am missing...
>
> Thanks
> Sudharsan
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Rocksdb - Incremental vs full checkpoints

2020-10-26 Thread Yun Tang
Hi Sudharsan

Once enable the incremental checkpoint, the delta size is the same as the size 
of newly uploaded sst files. Which might not be always the same considering 
RocksDB's compression ratio, compaction times and time to flush. If you really 
want to check the details, you could login the machine and find where locates 
state dir to see how sst files stored for each checkpoint when local recovery 
is enabled [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best
Yun Tang

From: Sudharsan R 
Sent: Monday, October 26, 2020 10:38
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb - Incremental vs full checkpoints

Hi Yun,
Sorry for the late reply - I was doing some reading. As far as i understand, 
when incremental checkpointing is enabled, the reported checkpoint 
size(metrics/UI) is only the size of the deltas and not the full state size. I 
understand that compaction may not get triggered. But, if we are creating a 
fixed amount of state every checkpoint interval, shouldn't the reported 
checkpoint size remain the same(as it is a delta)?


Thanks

Sudharsan

On Tue, Oct 13, 2020 at 11:34 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi

This difference of data size of incremental vs full checkpoint is due to the 
different implementations.
The incremental checkpoint strategy upload binary sst files while full 
checkpoint strategy scans the DB and write all kv entries to external DFS.

As your state size is really small (only 200 KB), I think your RocksDB has not 
ever triggered compaction to reduce sst files, that's why the size constantly 
increase.

Best
Yun Tang

From: sudranga mailto:sud.r...@gmail.com>>
Sent: Wednesday, October 14, 2020 10:40
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Rocksdb - Incremental vs full checkpoints

Hi,
I have an event-window pipeline which handles a fixed number of messages per
second for a fixed number of keys. When i have rocksdb as the state backend
with incremental checkpoints, i see the delta checkpoint size constantly
increase. Please see
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2790/Screen_Shot_2020-10-13_at_6.png>

I turned off incremental checkpoints and all the checkpoints are 64kb (There
appears to be no state leak in user code or otherwise). It is not clear why
the incremental checkpoints keep increasing in size. Perhaps, the
incremental checkpoints are not incremental(for this small state size) and
are simply full state appended to full state and so on...

>From some posts on this forum, I understand the use case for incremental
checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
where the changes in state are minimal across checkpoints. However, does
this mean that we should not enable incremental checkpointing for use cases
where the state size is much smaller? Would the 'constantly' increasing
snapshot delta size reduce at some point?  I don't see any compaction runs
happening
(taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
Not sure if that is what I am missing...

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RocksDB Number of Keys Metric

2018-09-03 Thread vino yang
Hi Ahmed,

If you feel that this metric is necessary, you can create an issue in JIRA,
then the problem may be more easily seen by the relevant people.
If you need to answer this question, maybe it is more effective to ping
@Andrey?

Thanks, vino.

Ahmed  于2018年9月2日周日 上午2:31写道:

> Is there a clean way of exposing a metrics regarding the number of keys
> (even if it is an estimate using 'rocksdb.estimate-num-keys') in a rocksdb
> state store? RocksDBValueState is not public to users code.
>
> Best,
> Ahmed
>
>


Re: RocksDB Number of Keys Metric

2018-09-06 Thread Andrey Zagrebin
Hi,

afaik, the option is not exposed according to the current state of source code.
I can see it to be useful and technically possible using:
db.getLongProperty(stateColumnFamilyHandle, "rocksdb.estimate-num-keys”);

Though couple of things come into my mind to take into account for this feature:

- we can expose it per state object for heap and rocksdb, e.g. as 
State.estimatekeyNum() 
  but it will report it only for key groups in the operator subtask where it is 
called.

- another way might be to expose it in Web UI where we can even sum it up for 
all operator subtasks.

- for all state types currently, "rocksdb.estimate-num-keys” would show total 
number of all state keys but
  for map state this particular metric would show total number of user map keys 
for all state
  keys, so it might be misleading and technically not so easy to align its 
semantics for map with other state types.

We can discuss here in mailing list, how many users would be interested in this 
feature and
then a JIRA ticket can be created to track it, if community decides to 
contribute this feature.

Cheers,
Andrey

> On 4 Sep 2018, at 05:49, vino yang  wrote:
> 
> Hi Ahmed,
> 
> If you feel that this metric is necessary, you can create an issue in JIRA, 
> then the problem may be more easily seen by the relevant people. 
> If you need to answer this question, maybe it is more effective to ping 
> @Andrey?
> 
> Thanks, vino.
> 
> Ahmed mailto:aalobaidi...@gmail.com>> 于2018年9月2日周日 
> 上午2:31写道:
> Is there a clean way of exposing a metrics regarding the number of keys (even 
> if it is an estimate using 'rocksdb.estimate-num-keys') in a rocksdb state 
> store? RocksDBValueState is not public to users code.
> 
> Best,
> Ahmed
> 



Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Andrey Zagrebin
Hi Taher,

TMs keep state locally while running, in this case RocksDB files already belong 
to TM.
You can point it to the same NVME disk location on each node, relevant Flink 
options here are:
- io.tmp.dirs
- taskmanager.state.local.root-dirs
This data is transient and has temporary nature. It does not survive a job 
failure.

The checkpoint is a logical snapshot of the operator state for all involved 
TMs, 
so it belongs to the job and usually uploaded to a distributed file system 
available on all TMs.
The location is set in Flink option ‘state.checkpoints.dir'.
This way job can restore from it with different set of TMs.

Best,
Andrey

> On 26 Oct 2018, at 08:29, Taher Koitawala  wrote:
> 
> Hi All,
>   Our current cluster configuration uses one HDD which is mainly for 
> root and an other NVME disk per node, [1]we want make sure all TMs write 
> their own RocksDB files to the NVME disk only, how do we do that? 
> 
> [2] Is it also possible to specify multiple directories per TMs so that we 
> have an even spread when the RocksDB files are written?  
> 
> Thanks,
> Taher Koitawala 



Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Taher Koitawala
Thanks!

On Fri 26 Oct, 2018, 3:31 PM Andrey Zagrebin, 
wrote:

> Hi Taher,
>
> TMs keep state locally while running, in this case RocksDB files already
> belong to TM.
> You can point it to the same NVME disk location on each node, relevant
> Flink options here are:
> - io.tmp.dirs
> - taskmanager.state.local.root-dirs
> This data is transient and has temporary nature. It does not survive a job
> failure.
>
> The checkpoint is a logical snapshot of the operator state for all
> involved TMs,
> so it belongs to the job and usually uploaded to a distributed file system
> available on all TMs.
> The location is set in Flink option ‘state.checkpoints.dir'.
> This way job can restore from it with different set of TMs.
>
> Best,
> Andrey
>
> > On 26 Oct 2018, at 08:29, Taher Koitawala 
> wrote:
> >
> > Hi All,
> >   Our current cluster configuration uses one HDD which is mainly
> for root and an other NVME disk per node, [1]we want make sure all TMs
> write their own RocksDB files to the NVME disk only, how do we do that?
> >
> > [2] Is it also possible to specify multiple directories per TMs so that
> we have an even spread when the RocksDB files are written?
> >
> > Thanks,
> > Taher Koitawala
>
>


Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Elias Levy
There is also state.backend.rocksdb.localdir.  Oddly, I can find the
documentation for it in the 1.5 docs
,
but not in the 1.6 docs
.
The option is still in master
,
and it is used

.

On Fri, Oct 26, 2018 at 3:01 AM Andrey Zagrebin 
wrote:

> Hi Taher,
>
> TMs keep state locally while running, in this case RocksDB files already
> belong to TM.
> You can point it to the same NVME disk location on each node, relevant
> Flink options here are:
> - io.tmp.dirs
> - taskmanager.state.local.root-dirs
> This data is transient and has temporary nature. It does not survive a job
> failure.
>
> The checkpoint is a logical snapshot of the operator state for all
> involved TMs,
> so it belongs to the job and usually uploaded to a distributed file system
> available on all TMs.
> The location is set in Flink option ‘state.checkpoints.dir'.
> This way job can restore from it with different set of TMs.
>
> Best,
> Andrey
>
> > On 26 Oct 2018, at 08:29, Taher Koitawala 
> wrote:
> >
> > Hi All,
> >   Our current cluster configuration uses one HDD which is mainly
> for root and an other NVME disk per node, [1]we want make sure all TMs
> write their own RocksDB files to the NVME disk only, how do we do that?
> >
> > [2] Is it also possible to specify multiple directories per TMs so that
> we have an even spread when the RocksDB files are written?
> >
> > Thanks,
> > Taher Koitawala
>
>


  1   2   >