On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax <[email protected]> wrote:

>
>
> On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]> wrote:
>
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels <[email protected]> wrote:
>>
>>> > If you randomly generate shard ids, buffer those until finalization,
>>> finalize a checkpoint so that you never need to re-run that generation,
>>> isn't the result stable from that point onwards?
>>>
>>> Yes, you're right :) For @RequiresStableInput we will always have to
>>> buffer and emit only after a finalized checkpoint.
>>>
>>> 2PC is the better model for Flink, at least in the case of Kafka because
>>> it can offload the buffering to Kafka via its transactions.
>>> RequiresStableInput is a more general solution and it is feasible to
>>> support it in the Flink Runner. However, we have to make sure that
>>> checkpoints are taken frequently to avoid too much memory pressure.
>>
>>
>>> It would be nice to also support 2PC in Beam, i.e. the Runner could
>>> choose to either buffer/materialize input or do a 2PC, but it would also
>>> break the purity of the existing model.
>>>
>>
>> Still digging in to details. I think the "generate random shard ids &
>> buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It
>> doesn't have to be done that way if the target system has special support
>> like Kafka does.
>>
>> For Kafka, can you get the 2PC behavior like this: Upstream step: open a
>> transaction, write a bunch of stuff to it (let Kafka do the buffering) and
>> emit a transaction identifier. Downstream @RequiresStableInput step: close
>> transaction. Again, I may be totally missing something, but I think that
>> this has identical characteristics:
>>
>
> Does Kafka garbage collect this eventually in the case where you crash and
> start again  with a different transaction identifier?
>

I believe that is what I read on the page about Flink's Kafka 2PC, though I
cannot find it any more. What would the alternative be for Kafka? You
always have to be ready for a client that goes away.

Kenn


>
>>  - Kafka does the buffering
>>  - checkpoint finalization is the driver of latency
>>  - failure before checkpoint finalization means the old transaction sits
>> around and times out eventually
>>  - failure after checkpoint finalization causes retry with the same
>> transaction identifier
>>
>> Kenn
>>
>>
>>>
>>> On 01.03.19 19:42, Kenneth Knowles wrote:
>>> > I think I am fundamentally misunderstanding checkpointing in Flink.
>>> >
>>> > If you randomly generate shard ids, buffer those until finalization,
>>> > finalize a checkpoint so that you never need to re-run that
>>> generation,
>>> > isn't the result stable from that point onwards?
>>> >
>>> > Kenn
>>> >
>>> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> >     Fully agree. I think we can improve the situation drastically. For
>>> >     KafkaIO EOS with Flink we need to make these two changes:
>>> >
>>> >     1) Introduce buffering while the checkpoint is being taken
>>> >     2) Replace the random shard id assignment with something
>>> deterministic
>>> >
>>> >     However, we won't be able to provide full compatibility with
>>> >     RequiresStableInput because Flink only guarantees stable input
>>> after a
>>> >     checkpoint. RequiresStableInput requires input at any point in
>>> time to
>>> >     be stable. IMHO the only way to achieve that is materializing
>>> output
>>> >     which Flink does not currently support.
>>> >
>>> >     KafkaIO does not need all the power of RequiresStableInput to
>>> achieve
>>> >     EOS with Flink, but for the general case I don't see a good
>>> solution at
>>> >     the moment.
>>> >
>>> >     -Max
>>> >
>>> >     On 01.03.19 16:45, Reuven Lax wrote:
>>> >      > Yeah, the person who was working on it originally stopped
>>> working on
>>> >      > Beam, and nobody else ever finished it. I think it is important
>>> to
>>> >      > finish though. Many of the existing Sinks are only fully
>>> correct for
>>> >      > Dataflow today, because they generate either Reshuffle or
>>> >     GroupByKey to
>>> >      > ensure input stability before outputting (in many cases this
>>> code
>>> >     was
>>> >      > inherited from before Beam existed). On Flink today, these sinks
>>> >     might
>>> >      > occasionally produce duplicate output in the case of failures.
>>> >      >
>>> >      > Reuven
>>> >      >
>>> >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <
>>> [email protected]
>>> >     <mailto:[email protected]>
>>> >      > <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>> >      >
>>> >      >     Circling back to the RequiresStableInput annotation[1]. I've
>>> >     done some
>>> >      >     protoyping to see how this could be integrated into Flink.
>>> I'm
>>> >      >     currently
>>> >      >     writing a test based on RequiresStableInput.
>>> >      >
>>> >      >     I found out there are already checks in place at the
>>> Runners to
>>> >      >     throw in
>>> >      >     case transforms use RequiresStableInput and its not
>>> >     supported. However,
>>> >      >     not a single transform actually uses the annotation.
>>> >      >
>>> >      >     It seems that the effort stopped at some point? Would it
>>> make
>>> >     sense to
>>> >      >     start annotating KafkaExactlyOnceSink with
>>> >     @RequiresStableInput? We
>>> >      >     could then get rid of the whitelist.
>>> >      >
>>> >      >     -Max
>>> >      >
>>> >      >     [1]
>>> >      >
>>> >
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>> >      >
>>> >      >
>>> >      >
>>> >      >     On 01.03.19 14:28, Maximilian Michels wrote:
>>> >      >      > Just realized that transactions do not spawn multiple
>>> >     elements in
>>> >      >      > KafkaExactlyOnceSink. So the proposed solution to stop
>>> >     processing
>>> >      >      > elements while a snapshot is pending would work.
>>> >      >      >
>>> >      >      > It is certainly not optimal in terms of performance for
>>> >     Flink and
>>> >      >     poses
>>> >      >      > problems when checkpoints take long to complete, but it
>>> >     would be
>>> >      >      > worthwhile to implement this to make use of the EOS
>>> feature.
>>> >      >      >
>>> >      >      > Thanks,
>>> >      >      > Max
>>> >      >      >
>>> >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
>>> >      >      >> Thanks you for the prompt replies. It's great to see
>>> that
>>> >     there is
>>> >      >      >> good understanding of how EOS in Flink works.
>>> >      >      >>
>>> >      >      >>> This is exactly what RequiresStableInput is supposed to
>>> >     do. On the
>>> >      >      >>> Flink runner, this would be implemented by delaying
>>> >     processing
>>> >      >     until
>>> >      >      >>> the current checkpoint is done.
>>> >      >      >>
>>> >      >      >> I don't think that works because we have no control over
>>> >     the Kafka
>>> >      >      >> transactions. Imagine:
>>> >      >      >>
>>> >      >      >> 1) ExactlyOnceWriter writes records to Kafka and
>>> commits,
>>> >     then
>>> >      >     starts
>>> >      >      >> a new transaction.
>>> >      >      >> 2) Flink checkpoints, delaying the processing of
>>> >     elements, the
>>> >      >      >> checkpoint fails.
>>> >      >      >> 3) We restore from an old checkpoint and will start
>>> writing
>>> >      >     duplicate
>>> >      >      >> data to Kafka. The de-duplication that the sink performs
>>> >     does not
>>> >      >      >> help, especially because the random shards ids might be
>>> >     assigned
>>> >      >      >> differently.
>>> >      >      >>
>>> >      >      >> IMHO we have to have control over commit to be able to
>>> >     provide EOS.
>>> >      >      >>
>>> >      >      >>> When we discussed this in Aug 2017, the understanding
>>> >     was that 2
>>> >      >      >>> Phase commit utility in Flink used to implement Flink's
>>> >     Kafka EOS
>>> >      >      >>> could not be implemented in Beam's context.
>>> >      >      >>
>>> >      >      >> That's also my understanding, unless we change the
>>> interface.
>>> >      >      >>
>>> >      >      >>> I don't see how SDF solves this problem..
>>> >      >      >>
>>> >      >      >> SDF has a checkpoint method which the Runner can call,
>>> >     but I think
>>> >      >      >> that you are right, that the above problem would be the
>>> same.
>>> >      >      >>
>>> >      >      >>> Absolutely. I would love to support EOS in KakaIO for
>>> >     Flink. I
>>> >      >     think
>>> >      >      >>> that will help many future exactly-once sinks.. and
>>> address
>>> >      >      >>> fundamental incompatibility between Beam model and
>>> Flink's
>>> >      >     horizontal
>>> >      >      >>> checkpointing for such applications.
>>> >      >      >>
>>> >      >      >> Great :)
>>> >      >      >>
>>> >      >      >>> The FlinkRunner would need to insert the "wait until
>>> >     checkpoint
>>> >      >      >>> finalization" logic wherever it sees
>>> @RequiresStableInput,
>>> >      >     which is
>>> >      >      >>> already what it would have to do.
>>> >      >      >>
>>> >      >      >> I don't think that fixes the problem. See above example.
>>> >      >      >>
>>> >      >      >> Thanks,
>>> >      >      >> Max
>>> >      >      >>
>>> >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
>>> >     <[email protected] <mailto:[email protected]>
>>> >      >     <mailto:[email protected] <mailto:[email protected]>>
>>> >      >      >>> <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
>>> >      >     <[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>
>>> >      >      >>>     <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>         I'm not sure what a hard fail is. I probably
>>> >     have a shallow
>>> >      >      >>>         understanding, but doesn't
>>> @RequiresStableInput work
>>> >      >     for 2PC?
>>> >      >      >>>         The preCommit() phase should establish the
>>> >     transaction and
>>> >      >      >>>         commit() is not called until after checkpoint
>>> >      >     finalization. Can
>>> >      >      >>>         you describe the way that it does not work a
>>> >     little bit
>>> >      >     more?
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>     - preCommit() is called before checkpoint. Kafka
>>> EOS in
>>> >      >     Flink starts
>>> >      >      >>>     the transaction before this and makes sure it
>>> >     flushes all
>>> >      >     records in
>>> >      >      >>>     preCommit(). So far good.
>>> >      >      >>>     - commit is called after checkpoint is persisted.
>>> >     Now, imagine
>>> >      >      >>>     commit() fails for some reason. There is no option
>>> >     to rerun
>>> >      >     the 1st
>>> >      >      >>>     phase to write the records again in a new
>>> >     transaction. This
>>> >      >     is a
>>> >      >      >>>     hard failure for the the job. In practice Flink
>>> might
>>> >      >     attempt to
>>> >      >      >>>     commit again (not sure how many times), which is
>>> >     likely to
>>> >      >     fail and
>>> >      >      >>>     eventually results in job failure.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>> In Apache Beam, the records could be stored in state,
>>> >     and can be
>>> >      >      >>> written inside commit() to work around this issue. It
>>> >     could have
>>> >      >      >>> scalability issues if checkpoints are not frequent
>>> >     enough in Flink
>>> >      >      >>> runner.
>>> >      >      >>>
>>> >      >      >>> Raghu.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>         Kenn
>>> >      >      >>>
>>> >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
>>> >      >     <[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>
>>> >      >      >>>         <mailto:[email protected]
>>> >     <mailto:[email protected]> <mailto:[email protected]
>>> >     <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth
>>> Knowles
>>> >      >      >>>             <[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>
>>> >      >     <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>                 I believe the way you would implement
>>> >     the logic
>>> >      >     behind
>>> >      >      >>>                 Flink's KafkaProducer would be to have
>>> >     two steps:
>>> >      >      >>>
>>> >      >      >>>                 1. Start transaction
>>> >      >      >>>                 2. @RequiresStableInput Close
>>> transaction
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>             I see.  What happens if closing the
>>> transaction
>>> >      >     fails in
>>> >      >      >>>             (2)? Flink's 2PC requires that commit()
>>> should
>>> >      >     never hard
>>> >      >      >>>             fail once preCommit() succeeds. I think
>>> that is
>>> >      >     cost of not
>>> >      >      >>>             having an extra shuffle. It is alright
>>> since
>>> >     this
>>> >      >     policy has
>>> >      >      >>>             worked well for Flink so far.
>>> >      >      >>>
>>> >      >      >>>             Overall, it will be great to have
>>> >     @RequiresStableInput
>>> >      >      >>>             support in Flink runner.
>>> >      >      >>>
>>> >      >      >>>             Raghu.
>>> >      >      >>>
>>> >      >      >>>                 The FlinkRunner would need to insert
>>> the
>>> >     "wait
>>> >      >     until
>>> >      >      >>>                 checkpoint finalization" logic
>>> wherever it
>>> >      >      >>>                 sees @RequiresStableInput, which is
>>> >     already what it
>>> >      >      >>>                 would have to do.
>>> >      >      >>>
>>> >      >      >>>                 This matches the KafkaProducer's logic
>>> -
>>> >     delay
>>> >      >     closing
>>> >      >      >>>                 the transaction until checkpoint
>>> >     finalization. This
>>> >      >      >>>                 answers my main question, which is "is
>>> >      >      >>>                 @RequiresStableInput expressive enough
>>> >     to allow
>>> >      >      >>>                 Beam-on-Flink to have exactly once
>>> behavior
>>> >      >     with the
>>> >      >      >>>                 same performance characteristics as
>>> >     native Flink
>>> >      >      >>>                 checkpoint finalization?"
>>> >      >      >>>
>>> >      >      >>>                 Kenn
>>> >      >      >>>
>>> >      >      >>>                 [1]
>>> https://github.com/apache/beam/pull/7955
>>> >      >      >>>
>>> >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM
>>> Reuven Lax
>>> >      >      >>>                 <[email protected]
>>> >     <mailto:[email protected]> <mailto:[email protected]
>>> >     <mailto:[email protected]>>
>>> >      >     <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM
>>> >     Raghu Angadi
>>> >      >      >>>                     <[email protected]
>>> >     <mailto:[email protected]> <mailto:[email protected]
>>> >     <mailto:[email protected]>>
>>> >      >     <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                             Now why does the Flink
>>> >     Runner not
>>> >      >     support
>>> >      >      >>>                             KafkaIO EOS? Flink's native
>>> >      >      >>>                             KafkaProducer supports
>>> >     exactly-once. It
>>> >      >      >>>                             simply commits the pending
>>> >      >      >>>                             transaction once it has
>>> >     completed a
>>> >      >      >>> checkpoint.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                         On Thu, Feb 28, 2019 at 9:59 AM
>>> >     Maximilian
>>> >      >      >>>                         Michels <[email protected]
>>> >     <mailto:[email protected]>
>>> >      >     <mailto:[email protected] <mailto:[email protected]>>
>>> >     <mailto:[email protected] <mailto:[email protected]>
>>> >     <mailto:[email protected] <mailto:[email protected]>>>>
>>> >      >      >>>                         wrote:
>>> >      >      >>>
>>> >      >      >>>                             Hi,
>>> >      >      >>>
>>> >      >      >>>                             I came across KafkaIO's
>>> Runner
>>> >      >     whitelist [1]
>>> >      >      >>>                             for enabling exactly-once
>>> >      >      >>>                             semantics (EOS). I think
>>> it is
>>> >      >     questionable
>>> >      >      >>>                             to exclude Runners from
>>> >      >      >>>                             inside a transform, but I
>>> >     see that the
>>> >      >      >>>                             intention was to save
>>> users from
>>> >      >      >>>                             surprises.
>>> >      >      >>>
>>> >      >      >>>                             Now why does the Flink
>>> >     Runner not
>>> >      >     support
>>> >      >      >>>                             KafkaIO EOS? Flink's native
>>> >      >      >>>                             KafkaProducer supports
>>> >     exactly-once. It
>>> >      >      >>>                             simply commits the pending
>>> >      >      >>>                             transaction once it has
>>> >     completed a
>>> >      >      >>> checkpoint.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                         When we discussed this in Aug
>>> >     2017, the
>>> >      >      >>>                         understanding was that 2 Phase
>>> >     commit
>>> >      >     utility in
>>> >      >      >>>                         Flink used to implement Flink's
>>> >     Kafka
>>> >      >     EOS could
>>> >      >      >>>                         not be implemented in Beam's
>>> >     context.
>>> >      >      >>>                         See this message
>>> >      >      >>>
>>> >     <https://www.mail-archive.com/[email protected]/msg02664.html
>>> > in
>>> >      >      >>>                         that dev thread. Has anything
>>> >     changed
>>> >      >     in this
>>> >      >      >>>                         regard? The whole thread is
>>> >     relevant to
>>> >      >     this
>>> >      >      >>>                         topic and worth going through.
>>> >      >      >>>
>>> >      >      >>>                     I think that TwoPhaseCommit utility
>>> >     class
>>> >      >     wouldn't
>>> >      >      >>>                     work. The Flink runner would
>>> >     probably want to
>>> >      >      >>>                     directly use notifySnapshotComplete
>>> >     in order to
>>> >      >      >>>                     implement @RequiresStableInput.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                             A checkpoint is realized by
>>> >     sending
>>> >      >     barriers
>>> >      >      >>>                             through all channels
>>> >      >      >>>                             starting from the source
>>> until
>>> >      >     reaching all
>>> >      >      >>>                             sinks. Every operator
>>> >      >      >>>                             persists its state once it
>>> has
>>> >      >     received a
>>> >      >      >>>                             barrier on all its input
>>> >      >      >>>                             channels, it then forwards
>>> >     it to the
>>> >      >      >>>                             downstream operators.
>>> >      >      >>>
>>> >      >      >>>                             The architecture of Beam's
>>> >      >      >>>                             KafkaExactlyOnceSink is as
>>> >     follows[2]:
>>> >      >      >>>
>>> >      >      >>>                             Input ->
>>> AssignRandomShardIds ->
>>> >      >     GroupByKey
>>> >      >      >>>                             -> AssignSequenceIds ->
>>> >      >      >>>                             GroupByKey ->
>>> ExactlyOnceWriter
>>> >      >      >>>
>>> >      >      >>>                             As I understood, Spark or
>>> >     Dataflow
>>> >      >     use the
>>> >      >      >>>                             GroupByKey stages to
>>> persist
>>> >      >      >>>                             the input. That is not
>>> >     required in
>>> >      >     Flink to
>>> >      >      >>>                             be able to take a
>>> consistent
>>> >      >      >>>                             snapshot of the pipeline.
>>> >      >      >>>
>>> >      >      >>>                             Basically, for Flink we
>>> >     don't need
>>> >      >     any of
>>> >      >      >>>                             that magic that KafkaIO
>>> does.
>>> >      >      >>>                             What we would need to
>>> >     support EOS
>>> >      >     is a way
>>> >      >      >>>                             to tell the
>>> ExactlyOnceWriter
>>> >      >      >>>                             (a DoFn) to commit once a
>>> >      >     checkpoint has
>>> >      >      >>>                             completed.
>>> >      >      >>>
>>> >      >      >>>                             I know that the new version
>>> >     of SDF
>>> >      >     supports
>>> >      >      >>>                             checkpointing which should
>>> >      >      >>>                             solve this issue. But
>>> there is
>>> >      >     still a lot
>>> >      >      >>>                             of work to do to make this
>>> >      >      >>>                             reality.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                         I don't see how SDF solves this
>>> >      >     problem.. May be
>>> >      >      >>>                         pseudo code would make more
>>> >     clear.  But if
>>> >      >      >>>                         helps, that is great!
>>> >      >      >>>
>>> >      >      >>>                             So I think it would make
>>> >     sense to think
>>> >      >      >>>                             about a way to make
>>> KafkaIO's
>>> >      >      >>>                             EOS more accessible to
>>> Runners
>>> >      >     which support
>>> >      >      >>>                             a different way of
>>> >      >      >>>                             checkpointing.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                         Absolutely. I would love to
>>> >     support EOS in
>>> >      >      >>>                         KakaIO for Flink. I think that
>>> will
>>> >      >     help many
>>> >      >      >>>                         future exactly-once sinks.. and
>>> >     address
>>> >      >      >>>                         fundamental incompatibility
>>> between
>>> >      >     Beam model
>>> >      >      >>>                         and Flink's horizontal
>>> checkpointing
>>> >      >     for such
>>> >      >      >>>                         applications.
>>> >      >      >>>
>>> >      >      >>>                         Raghu.
>>> >      >      >>>
>>> >      >      >>>                             Cheers,
>>> >      >      >>>                             Max
>>> >      >      >>>
>>> >      >      >>>                             PS: I found this document
>>> about
>>> >      >      >>>                             RequiresStableInput [3],
>>> but
>>> >     IMHO
>>> >      >      >>>                             defining an annotation only
>>> >      >     manifests the
>>> >      >      >>>                             conceptual difference
>>> between
>>> >      >      >>>                             the Runners.
>>> >      >      >>>
>>> >      >      >>>
>>> >      >      >>>                             [1]
>>> >      >      >>>
>>> >      >
>>> >
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>> >      >
>>> >      >      >>>
>>> >      >      >>>                             [2]
>>> >      >      >>>
>>> >      >
>>> >
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>> >      >
>>> >      >      >>>
>>> >      >      >>>                             [3]
>>> >      >      >>>
>>> >      >
>>> >
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>> >      >
>>> >      >      >>>
>>> >      >      >>>
>>> >      >
>>> >
>>>
>>

Reply via email to