I was glad to find that bravo had now been updated to support installing
bravo to a local maven repo.

I was able to load a checkpoint created by my job, thanks to the example
provided in bravo README, but I'm still missing the essential piece.

My code was:

        OperatorStateReader reader = new OperatorStateReader(env2,
savepoint, "DistinctFunction");
        DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what
should I put here?);

I don't know how to read the values collected from reduce() calls in the
state. Is there a way to access the reducing state of the window with
bravo? I'm a bit confused how this works, because when I check with
debugger, flink internally uses a ReducingStateDescriptor
with name=window-contents, but still reading operator state for
"DistinctFunction" didn't at least throw an exception ("window-contents"
threw – obviously there's no operator by that name).

Cheers,
Juho

On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote:

> Hi Stefan,
>
> Sorry but it doesn't seem immediately clear to me what's a good way to use
> https://github.com/king/bravo.
>
> How are people using it? Would you for example modify build.gradle somehow
> to publish the bravo as a library locally/internally? Or add code directly
> in the bravo project (locally) and run it from there (using an IDE, for
> example)? Also it doesn't seem like the bravo gradle project supports
> building a flink job jar, but if it does, how do I do it?
>
> Thanks.
>
> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote:
>
>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>
>> > How would you assume that backpressure would influence your updates?
>> Updates to each local state still happen event-by-event, in a single
>> reader/writing thread.
>>
>> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
>> internals. Any way high backpressure is not a seen on this job after it has
>> caught up the lag, so at I thought it would be worth mentioning.
>>
>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>>>
>>> > you could take a look at Bravo [1] to query your savepoints and to
>>> check if the state in the savepoint complete w.r.t your expectations
>>>
>>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>>> missed ids were being logged by the reducer soon after the job had started
>>> (after restoring a savepoint). But on the other hand, after that I also
>>> made another savepoint & restored that, so what I could check is: does that
>>> next savepoint have the missed ids that were logged (a couple of minutes
>>> before the savepoint was created, so there should've been more than enough
>>> time to add them to the state before the savepoint was triggered) or not.
>>> Any way, if I would be able to verify with Bravo that the ids are missing
>>> from the savepoint (even though reduced logged that it saw them), would
>>> that help in figuring out where they are lost? Is there some major
>>> difference compared to just looking at the final output after window has
>>> been triggered?
>>>
>>>
>>>
>>> I think that makes a difference. For example, you can investigate if
>>> there is a state loss or a problem with the windowing. In the savepoint you
>>> could see which keys exists and to which windows they are assigned. Also
>>> just to make sure there is no misunderstanding: only elements that are in
>>> the state at the start of a savepoint are expected to be part of the
>>> savepoint; all elements between start and completion of the savepoint are
>>> not expected to be part of the savepoint.
>>>
>>>
>>> > I also doubt that the problem is about backpressure after restore,
>>> because the job will only continue running after the state restore is
>>> already completed.
>>>
>>> Yes, I'm not suspecting that the state restoring would be the problem
>>> either. My concern was about backpressure possibly messing with the updates
>>> of reducing state? I would tend to suspect that updating the state
>>> consistently is what fails, where heavy load / backpressure might be a
>>> factor.
>>>
>>>
>>>
>>> How would you assume that backpressure would influence your updates?
>>> Updates to each local state still happen event-by-event, in a single
>>> reader/writing thread.
>>>
>>>
>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> you could take a look at Bravo [1] to query your savepoints and to
>>>> check if the state in the savepoint complete w.r.t your expectations. I
>>>> somewhat doubt that there is a general problem with the state/savepoints
>>>> because many users are successfully running it on a large state and I am
>>>> not aware of any data loss problems, but nothing is impossible. What the
>>>> savepoint does is also straight forward: iterate a db snapshot and write
>>>> all key/value pairs to disk, so all data that was in the db at the time of
>>>> the savepoint, should show up. I also doubt that the problem is about
>>>> backpressure after restore, because the job will only continue running
>>>> after the state restore is already completed. Did you check if you are
>>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>>> that the kafka consumer start position is configured properly [2]? Are
>>>> watermarks generated as expected after restore?
>>>>
>>>> One more unrelated high-level comment that I have: for a granularity of
>>>> 24h windows, I wonder if it would not make sense to use a batch job 
>>>> instead?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> [1] https://github.com/king/bravo
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>
>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>
>>>> Thanks for the suggestions!
>>>>
>>>> > In general, it would be tremendously helpful to have a minimal
>>>> working example which allows to reproduce the problem.
>>>>
>>>> Definitely. The problem with reproducing has been that this only seems
>>>> to happen in the bigger production data volumes.
>>>>
>>>> That's why I'm hoping to find a way to debug this with the production
>>>> data. With that it seems to consistently cause some misses every time the
>>>> job is killed/restored.
>>>>
>>>> > check if it happens for shorter windows, like 1h etc
>>>>
>>>> What would be the benefit of that compared to 24h window?
>>>>
>>>> > simplify the job to not use a reduce window but simply a time window
>>>> which outputs the window events. Then counting the input and output events
>>>> should allow you to verify the results. If you are not seeing missing
>>>> events, then it could have something to do with the reducing state used in
>>>> the reduce function.
>>>>
>>>> Hm, maybe, but not sure how useful that would be, because it wouldn't
>>>> yet prove that it's related to reducing, because not having a reduce
>>>> function could also mean smaller load on the job, which might alone be
>>>> enough to make the problem not manifest.
>>>>
>>>> Is there a way to debug what goes into the reducing state (including
>>>> what gets removed or overwritten and what restored), if that makes sense..?
>>>> Maybe some suitable logging could be used to prove that the lost data is
>>>> written to the reducing state (or at least asked to be written), but not
>>>> found any more when the window closes and state is flushed?
>>>>
>>>> On configuration once more, we're using RocksDB state backend with
>>>> asynchronous incremental checkpointing. The state is restored from
>>>> savepoints though, we haven't been using those checkpoints in these tests
>>>> (although they could be used in case of crashes – but we haven't had those
>>>> now).
>>>>
>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> another idea to further narrow down the problem could be to simplify
>>>>> the job to not use a reduce window but simply a time window which outputs
>>>>> the window events. Then counting the input and output events should allow
>>>>> you to verify the results. If you are not seeing missing events, then it
>>>>> could have something to do with the reducing state used in the reduce
>>>>> function.
>>>>>
>>>>> In general, it would be tremendously helpful to have a minimal working
>>>>> example which allows to reproduce the problem.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>> and...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi Juho,
>>>>>>
>>>>>> can you try to reduce the job to minimal reproducible example and
>>>>>> share the job and input?
>>>>>>
>>>>>> For example:
>>>>>> - some simple records as input, e.g. tuples of primitive types saved
>>>>>> as cvs
>>>>>> - minimal deduplication job which processes them and misses records
>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>> - setup which you use for the job, ideally locally reproducible or
>>>>>> cloud
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>
>>>>>> Sorry to insist, but we seem to be blocked for any serious usage of
>>>>>> state in Flink if we can't rely on it to not miss data in case of 
>>>>>> restore.
>>>>>>
>>>>>> Would anyone have suggestions for how to troubleshoot this? So far I
>>>>>> have verified with DEBUG logs that our reduce function gets to process 
>>>>>> also
>>>>>> the data that is missing from window output.
>>>>>>
>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Andrey,
>>>>>>>
>>>>>>> To rule out for good any questions about sink behaviour, the job was
>>>>>>> killed and started with an additional Kafka sink.
>>>>>>>
>>>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>>>> BucketingSink.
>>>>>>>
>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>
>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, Andrey.
>>>>>>>>
>>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>>> dropped records.
>>>>>>>>
>>>>>>>> I'm not sure what you mean by that? I mean, it was known from the
>>>>>>>> beginning, that not everything is lost before/after restoring a 
>>>>>>>> savepoint,
>>>>>>>> just some records around the time of restoration. It's not 100% clear
>>>>>>>> whether records are lost before making a savepoint or after restoring 
>>>>>>>> it.
>>>>>>>> Although, based on the new DEBUG logs it seems more like losing some
>>>>>>>> records that are seen ~soon after restoring. It seems like Flink would 
>>>>>>>> be
>>>>>>>> somehow confused either about the restored state vs. new inserts to 
>>>>>>>> state.
>>>>>>>> This could also be somehow linked to the high back pressure on the 
>>>>>>>> kafka
>>>>>>>> source while the stream is catching up.
>>>>>>>>
>>>>>>>> > If it is feasible for your setup, I suggest to insert one more
>>>>>>>> map function after reduce and before sink.
>>>>>>>> > etc.
>>>>>>>>
>>>>>>>> Isn't that the same thing that we discussed before? Nothing is sent
>>>>>>>> to BucketingSink before the window closes, so I don't see how it would 
>>>>>>>> make
>>>>>>>> any difference if we replace the BucketingSink with a map function or
>>>>>>>> another sink type. We don't create or restore savepoints during the 
>>>>>>>> time
>>>>>>>> when BucketingSink gets input or has open buckets – that happens at a 
>>>>>>>> much
>>>>>>>> later time of day. I would focus on figuring out why the records are 
>>>>>>>> lost
>>>>>>>> while the window is open. But I don't know how to do that. Would you 
>>>>>>>> have
>>>>>>>> any additional suggestions?
>>>>>>>>
>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Juho,
>>>>>>>>>
>>>>>>>>> so it means that the savepoint does not loose at least some
>>>>>>>>> dropped records.
>>>>>>>>>
>>>>>>>>> If it is feasible for your setup, I suggest to insert one more map
>>>>>>>>> function after reduce and before sink.
>>>>>>>>> The map function should be called right after window is triggered
>>>>>>>>> but before flushing to s3.
>>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>>> This should allow to check whether the processed distinct records
>>>>>>>>> were buffered in the state after the restoration from the savepoint 
>>>>>>>>> or not.
>>>>>>>>> If they were buffered we should see that there was an attempt to 
>>>>>>>>> write them
>>>>>>>>> to the sink from the state.
>>>>>>>>>
>>>>>>>>> Another suggestion is to try to write records to some other sink
>>>>>>>>> or to both.
>>>>>>>>> E.g. if you can access file system of workers, maybe just into
>>>>>>>>> local files and check whether the records are also dropped there.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Andrey
>>>>>>>>>
>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi Andrey!
>>>>>>>>>
>>>>>>>>> I was finally able to gather the DEBUG logs that you suggested. In
>>>>>>>>> short, the reducer logged that it processed at least some of the ids 
>>>>>>>>> that
>>>>>>>>> were missing from the output.
>>>>>>>>>
>>>>>>>>> "At least some", because I didn't have the job running with DEBUG
>>>>>>>>> logs for the full 24-hour window period. So I was only able to look 
>>>>>>>>> up if I
>>>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did
>>>>>>>>> indeed.
>>>>>>>>>
>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>>>>> Map<String, String> value2) {
>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>>         return value1;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> Then:
>>>>>>>>>
>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>
>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>
>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>
>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18
>>>>>>>>> 08:35 UTC 2018
>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>>> - Ran until caught up offsets
>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>>> savepoint, let it keep running so that it will eventually write the 
>>>>>>>>> output
>>>>>>>>>
>>>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>>>> 24-hour window closed, I compared the results again with a batch 
>>>>>>>>> version's
>>>>>>>>> output. And found some missing ids as usual.
>>>>>>>>>
>>>>>>>>> I drilled down to one specific missing id (I'm replacing the
>>>>>>>>> actual value with AN12345 below), which was not found in the stream 
>>>>>>>>> output,
>>>>>>>>> but was found in batch output & flink DEBUG logs.
>>>>>>>>>
>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>
>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>>
>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first
>>>>>>>>> time, proved by this log line:
>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>
>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>
>>>>>>>>> (
>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1
>>>>>>>>> min delay before next)
>>>>>>>>> /
>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>>>>>>>>
>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>
>>>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>>>> savepoint until the stream caught up with the kafka offsets. 
>>>>>>>>> Although, our
>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer 
>>>>>>>>> itself,
>>>>>>>>> so event time of all partitions is synchronized. As expected, we 
>>>>>>>>> don't get
>>>>>>>>> any late data in the late data side output.
>>>>>>>>>
>>>>>>>>> From this we can see that the missing ids are processed by the
>>>>>>>>> reducer, but they must get lost somewhere before the 24-hour window is
>>>>>>>>> triggered.
>>>>>>>>>
>>>>>>>>> I think it's worth mentioning once more that the stream doesn't
>>>>>>>>> miss any ids if we let it's running without interruptions / state 
>>>>>>>>> restoring.
>>>>>>>>>
>>>>>>>>> What's next?
>>>>>>>>>
>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Juho,
>>>>>>>>>>
>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a
>>>>>>>>>> burst of input
>>>>>>>>>>
>>>>>>>>>> This is of course totally true, my understanding is the same. We
>>>>>>>>>> cannot exclude problem there for sure, just savepoints are used a 
>>>>>>>>>> lot w/o
>>>>>>>>>> problem reports and BucketingSink is known to be problematic with 
>>>>>>>>>> s3. That
>>>>>>>>>> is why, I asked you:
>>>>>>>>>>
>>>>>>>>>> > You also wrote that the timestamps of lost event are 'probably'
>>>>>>>>>> around the time of the savepoint, if it is not yet for sure I would 
>>>>>>>>>> also
>>>>>>>>>> check it.
>>>>>>>>>>
>>>>>>>>>> Although, bucketing sink might loose any data at the end of the
>>>>>>>>>> day (also from the middle). The fact, that it is always around the 
>>>>>>>>>> time of
>>>>>>>>>> taking a savepoint and not random, is surely suspicious and possible
>>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>>
>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>
>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the key
>>>>>>>>>> name (to find if the object exists) before creating the object, 
>>>>>>>>>> Amazon S3
>>>>>>>>>> provides 'eventual consistency' for read-after-write.
>>>>>>>>>>
>>>>>>>>>> The algorithm you suggest is how it is roughly implemented now
>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>> 'eventual consistency’ means that even if you just created file (its 
>>>>>>>>>> name
>>>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>
>>>>>>>>>> The BucketingSink was designed for a standard file system. s3 is
>>>>>>>>>> used over a file system wrapper atm but does not always provide 
>>>>>>>>>> normal file
>>>>>>>>>> system guarantees. See also last example in [1].
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>
>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll
>>>>>>>>>> try them.
>>>>>>>>>>
>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>
>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>> sure. I would also check whether the size of missing events is 
>>>>>>>>>> around the
>>>>>>>>>> batch size of BucketingSink or not.
>>>>>>>>>>
>>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>>> probable subject first. So what do you think about this – true or 
>>>>>>>>>> false:
>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of 
>>>>>>>>>> input.
>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get 
>>>>>>>>>> any
>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I 
>>>>>>>>>> totally
>>>>>>>>>> missed how Flink works in triggering window results? I would not 
>>>>>>>>>> expect
>>>>>>>>>> there to be any optimization that speculatively triggers early 
>>>>>>>>>> results of a
>>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>>
>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list already
>>>>>>>>>> written file parts (batches) and determine index of the next part to 
>>>>>>>>>> start.
>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], the
>>>>>>>>>> BucketingSink can rewrite the previously written part and basically 
>>>>>>>>>> loose
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> I was wondering, what does S3's "read-after-write consistency"
>>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that this 
>>>>>>>>>> might
>>>>>>>>>> be possible:
>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key
>>>>>>>>>> doesn't exist on S3
>>>>>>>>>>
>>>>>>>>>> But definitely sounds easier if a sink keeps track of files in a
>>>>>>>>>> way that's guaranteed to be consistent.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Juho
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>> to list already written file parts (batches) and determine index
>>>>>>>>>>> of the next part to start. Due to eventual consistency of checking 
>>>>>>>>>>> file
>>>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously 
>>>>>>>>>>> written
>>>>>>>>>>> part and basically loose it. It should be fixed for 
>>>>>>>>>>> StreamingFileSink in
>>>>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not 
>>>>>>>>>>> rely on
>>>>>>>>>>> s3 as a file system.
>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>
>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>>> sure  I would also check whether the size of missing events is 
>>>>>>>>>>> around the
>>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the 
>>>>>>>>>>> timestamps of
>>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it 
>>>>>>>>>>> is not
>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>
>>>>>>>>>>> Have you already checked the log files of job manager and task
>>>>>>>>>>> managers for the job running before and after the restore from the 
>>>>>>>>>>> check
>>>>>>>>>>> point? Is everything successful there, no errors, relevant warnings 
>>>>>>>>>>> or
>>>>>>>>>>> exceptions?
>>>>>>>>>>>
>>>>>>>>>>> As the next step, I would suggest to log all encountered events
>>>>>>>>>>> in DistinctFunction.reduce if possible for production data and check
>>>>>>>>>>> whether the missed events are eventually processed before or after 
>>>>>>>>>>> the
>>>>>>>>>>> savepoint. The following log message indicates a border between the 
>>>>>>>>>>> events
>>>>>>>>>>> that should be included into the savepoint (logged before) or not:
>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Andrey
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>
>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>> production use for us as it doesn't support s3*. I could use
>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in 
>>>>>>>>>>> doing so.
>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>
>>>>>>>>>>> > I realized that BucketingSink must not play any role in this
>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring 
>>>>>>>>>>> point
>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>> anything
>>>>>>>>>>> either (right?).
>>>>>>>>>>>
>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how
>>>>>>>>>>> there could be any difference. It's very real that the sink doesn't 
>>>>>>>>>>> get any
>>>>>>>>>>> input for a long time until the 24-hour window closes, and then it 
>>>>>>>>>>> quickly
>>>>>>>>>>> writes out everything because it's not that much data eventually 
>>>>>>>>>>> for the
>>>>>>>>>>> distinct values.
>>>>>>>>>>>
>>>>>>>>>>> Any ideas for debugging what's happening around the savepoint &
>>>>>>>>>>> restoration time?
>>>>>>>>>>>
>>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative
>>>>>>>>>>> sink. This was before I came to realize that most likely the sink 
>>>>>>>>>>> component
>>>>>>>>>>> has nothing to do with the data loss problem. I tried it with 
>>>>>>>>>>> s3n:// path
>>>>>>>>>>> just to see an exception being thrown. In the source code I indeed 
>>>>>>>>>>> then
>>>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://".
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok, I think before further debugging the window reduced state,
>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in
>>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>
>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems
>>>>>>>>>>>> like there's a bug somewhere now that the output is missing some 
>>>>>>>>>>>> data.
>>>>>>>>>>>>
>>>>>>>>>>>> > I would wait and check the actual output in s3 because it is
>>>>>>>>>>>> the main result of the job
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, and that's what I have already done. There seems to be
>>>>>>>>>>>> always some data loss with the production data volumes, if the job 
>>>>>>>>>>>> has been
>>>>>>>>>>>> restarted on that day.
>>>>>>>>>>>>
>>>>>>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>>>>>>>
>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>
>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it
>>>>>>>>>>>>> is the main result of the job and
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is
>>>>>>>>>>>>> already consumed from Kafka.
>>>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed 
>>>>>>>>>>>>> offset in
>>>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that
>>>>>>>>>>>>> the final result in s3 should include all records after it.
>>>>>>>>>>>>> This is what should be guaranteed but not the contents of the
>>>>>>>>>>>>> intermediate savepoint.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I check for the missed data from the final output on s3. So I
>>>>>>>>>>>>> wait until the next day, then run the same thing re-implemented 
>>>>>>>>>>>>> in batch,
>>>>>>>>>>>>> and compare the output.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a
>>>>>>>>>>>>> bug somewhere.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Then it should just come later after the restore and should
>>>>>>>>>>>>> be reduced within the allowed lateness into the final result 
>>>>>>>>>>>>> which is saved
>>>>>>>>>>>>> into s3.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role
>>>>>>>>>>>>> here, because I started running the job with allowedLateness=0, 
>>>>>>>>>>>>> and still
>>>>>>>>>>>>> get the data loss, while my late data output doesn't receive 
>>>>>>>>>>>>> anything.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>>> the actual implementation, basically saving just one of records 
>>>>>>>>>>>>> inside the
>>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy
>>>>>>>>>>>>> before the DistinctFunction. So there's one record for each key 
>>>>>>>>>>>>> (which is
>>>>>>>>>>>>> the combination of a couple of fields). In practice I've seen 
>>>>>>>>>>>>> that we're
>>>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output 
>>>>>>>>>>>>> is
>>>>>>>>>>>>> obviously much more than that.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>
>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public Object getKey(Map<String, String> event) throws
>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>     }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID",
>>>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in
>>>>>>>>>>>>>> the middle of the day
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered 
>>>>>>>>>>>>>> and saved
>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The late data around the time of taking savepoint might be
>>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the 
>>>>>>>>>>>>>> restore and
>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final 
>>>>>>>>>>>>>> result which
>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>>>> the actual implementation, basically saving just one of records 
>>>>>>>>>>>>>> inside the
>>>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data
>>>>>>>>>>>>>> when restoring from savepoint.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in this
>>>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from
>>>>>>>>>>>>>>> the equation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs 
>>>>>>>>>>>>>>> to enable.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues
>>>>>>>>>>>>>>> with that, that could contribute to lost data when restoring a 
>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when
>>>>>>>>>>>>>>>> state is restored from a savepoint.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly
>>>>>>>>>>>>>>>> the data gets dropped?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It
>>>>>>>>>>>>>>>> doesn't have any custom state management.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that
>>>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a 
>>>>>>>>>>>>>>>> small amount
>>>>>>>>>>>>>>>> of data. The event time of lost data is probably around the 
>>>>>>>>>>>>>>>> time of
>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not 
>>>>>>>>>>>>>>>> entirely
>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) 
>>>>>>>>>>>>>>>> events that come
>>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments
>>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But in 
>>>>>>>>>>>>>>>> production
>>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least 
>>>>>>>>>>>>>>>> something on
>>>>>>>>>>>>>>>> every restore.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>>>> initially created. It was at first run on an older version of 
>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. 
>>>>>>>>>>>>>>>> what's been
>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>>> DistinctFunction())
>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything
>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer 
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also 
>>>>>>>>>>>>>>>> write late
>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it 
>>>>>>>>>>>>>>>> would, it could
>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our 
>>>>>>>>>>>>>>>> late data
>>>>>>>>>>>>>>>> writing works, because we previously had some actual late data 
>>>>>>>>>>>>>>>> which ended
>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled
>>>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness
>>>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't 
>>>>>>>>>>>>>>>> have a bug
>>>>>>>>>>>>>>>> that's related to restoring state in combination with the 
>>>>>>>>>>>>>>>> allowedLateness
>>>>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough 
>>>>>>>>>>>>>>>> order to not
>>>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer 
>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>> extractor.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to