Hi Nico,
Thanks for the detailed explanation. The only change I have made in my 
flink-conf.yaml file is the following.
state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
The default "state.backend" value is set to filesystem. Removing the 
env.setStateBackend() method code or changing the "state.backend" property to 
rocksdb does not change the state backend to RocksDB. I got this verified by 
looking at the Flink log files. I have mentioned a sample of the log file for 
your reference.
-------------------------------------------------------carbon-5th:38631/user/taskmanager)
 as 1ac63dfb481eab3d3165a965084115f3. Current number of registered hosts is 1. 
Current number of alive task slots is 1.
2018-03-19 23:10:11,606 INFO  org.apache.flink.runtime.client.JobClient         
            - Checking and uploading JAR files
2018-03-19 23:10:11,618 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink 
application.).
2018-03-19 23:10:11,623 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Using restart strategy 
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
delayBetweenRestartAttempts=10000) for 7c19a14f4e75149ffaa064fac7e2bf29.
2018-03-19 23:10:11,636 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers 
via failover strategy: full graph restart
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Running initialization on master for job Flink application. 
(7c19a14f4e75149ffaa064fac7e2bf29).
2018-03-19 23:10:11,648 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Successfully ran initialization on master in 0 ms.
2018-03-19 23:10:11,664 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Using application-defined state backend for checkpoint/savepoint 
metadata: RocksDB State Backend {isInitialized=false, 
configuredDbBasePaths=null, initializedDbBasePaths=null, 
checkpointStreamBackend=File State Backend @ 
file:/home/ubuntu/tmp-flink-rocksdb}.
2018-03-19 23:10:11,685 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            - Scheduling job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink 
application.).
2018-03-19 23:10:11,685 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink 
application. (7c19a14f4e75149ffaa064fac7e2bf29) switched from state CREATED to 
RUNNING.
2018-03-19 23:10:11,692 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
CREATED to SCHEDULED.
2018-03-19 23:10:11,698 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) 
(796fcd9c38c87b6efb6f512e78e626e9) switched from CREATED to SCHEDULED.
2018-03-19 23:10:11,706 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,707 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying 
Source: inputStream -> Filter (1/1) (attempt #0) to computer1
2018-03-19 23:10:11,712 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) 
(796fcd9c38c87b6efb6f512e78e626e9) switched from SCHEDULED to DEPLOYING.
2018-03-19 23:10:11,712 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) (attempt 
#0) to computer1
2018-03-19 23:10:12,004 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
inputStream -> Filter (1/1) (ebf95b067eb7624edeb151bcba3d55f8) switched from 
DEPLOYING to RUNNING.
2018-03-19 23:10:12,011 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@43d5ff75},
 CountTrigger(1), 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor@337a4b0a, 
WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed (1/1) 
(796fcd9c38c87b6efb6f512e78e626e9) switched from DEPLOYING to RUNNING.
2018-03-19 23:10:12,695 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 1 @ 1521481212687
2018-03-19 23:10:12,844 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 1 (244193 bytes in 155 ms).
2018-03-19 23:10:13,687 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 2 @ 1521481213687
2018-03-19 23:10:13,744 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 2 (257342 bytes in 46 ms).
2018-03-19 23:10:14,687 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 3 @ 1521481214687
2018-03-19 23:10:14,786 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 3 (271359 bytes in 98 ms).
2018-03-19 23:10:15,688 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 4 @ 1521481215687
2018-03-19 23:10:15,780 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 4 (285375 bytes in 91 ms).
2018-03-19 23:10:16,687 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 5 @ 1521481216687
2018-03-19 23:10:16,764 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 5 (299392 bytes in 76 
ms).----------------------------------------------------------------------------------I
 did not get "Asynchronous RocksDB snapshot ..." kind of message in the logs. 
Even if I have changed the state backend properties in the flink-conf.yaml file 
the log message remained the same. I think there is some issue with detecting 
the correct state back end.
Regarding the following 
sentence,------------------------------------------------------------------------------------
Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.
------------------------------------------------------------------------------------I
 have used the average checkpoint size in the charts which was obtained through 
the Flink dashboard. I hope the values in the Flink dashboard shows the 
holistic accurate view of the checkpoint sizes. If not, could you please 
explain how to measure the size of an incremental checkpoint in Flink?


Thanks,Miyuru

    On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber 
<n...@data-artisans.com> wrote:  
 
 Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
    env.setStateBackend(new RocksDBStateBackend(
          new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
          true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:
> Hi,
> 
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
> 
> The Flink application we used is as follows,
> //---------------------------------------------------------------------------------------------------------
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream<Tuple4<String, Float, Integer, String>>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 10000;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
> 
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>         // start a checkpoint every 1000 ms
>         env.enableCheckpointing(1000);
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
> 
>         env.setBufferTimeout(timeoutMillis);
>         inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
> 
>         DataStream<Tuple4<String, Float, Integer, String>>
> incrementStream2 =
>                 inputStream.filter(new FilterFunction<Tuple4<String,
> Float, Integer, String>>() {
>                     @Override
>                     public boolean filter(Tuple4<String, Float, Integer,
> String> tuple) throws Exception {
>                         if (tuple.f1 > 10) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>         incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat<Tuple4<String, Float, Integer,
>                 String>>());
> 
>         try {
>             env.execute("Flink application.");
>         } catch (Exception e) {
>             logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>         }
>     }
> }
> 
> //---------------------------------------------------------------------------------------------------------
> 
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) with the results and another image with the
> Flink dashboard (Flink-Dashboard.png). The average state size chart
> indicates that the size of an incremental checkpoint is smaller than a
> full (i.e., complete) checkpoint. This is the expected behavior from any
> incremental checkpointing system since the incremental checkpoint just
> stores the delta change. However, the average latency chart indicates
> that the average latency for taking an incremental checkpoint from Flink
> is larger than taking a complete (i.e., Full) checkpoint. Is this the
> expected behavior? I have highlighted the two fields in the
> "Flink-Dashboard.png" which we used as the fields for the average
> latency and the average state size. Note that to convert the incremental
> checkpointing application to full checkpointing application we just
> commented out the following lines in the above code.
> 
>         try {
>             env.setStateBackend(new RocksDBStateBackend(
>                     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>                     true));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
> 
> Thanks,
> Miyuru
  

Reply via email to