Thanks Chris,

I have put up a review board here
https://reviews.apache.org/r/17869/

I am new to scala/samza, but I tried to follow the style guide and other
examples in the code.


2) I could not get the configuration working, and when I looked at the
other Config classes they did not seem to be referenced anywhere except in
an import. I am not sure what I was doing wrong there.

4/6) I have moved the deserialization before the updateFetchMap() function
call, because I thought that was better than incrementing and then
decrementing.  If there is a reason I should not have done it that way I
can fix it.

Thanks


Danny



On Fri, Feb 7, 2014 at 9:56 AM, Chris Riccomini <[email protected]>wrote:

>  Hey Danny,
>
>  Nice! A few comments.
>
>    1. Could you please use JIRA (
>    https://issues.apache.org/jira/browse/SAMZA-59) review board to upload
>    your patch, and instigate discussion on the ticket? Have a look at
>    http://samza.incubator.apache.org/contribute/rules.html and
>    http://wiki.apache.org/samza/ContributorsCorner.
>    2. Regarding enum, we use a config pattern instead. You should create
>    a SystemConsumersConfig object in org.apache.samza.config (see other
>    classes in there for examples). You should then use this class in
>    SamzaContainer to get the drop on failure setting when instantiating the
>    SystemConsumers object. Something like
>    config.getDropIncomingMessagesOnSerdeFailure.
>    3. I don't think we need to distinguish between drop and log. A
>    boolean is fine. We should just debug() if drop is enabled.
>    4. It's a safer to isolate the serdeManager.fromBytes(envelope) call
>    in the try/catch block, and move
>    the unprocessedMessages(envelope.getSystemStreamPartition).enqueue(…) call
>    outside of it, just so we don't actually catch object not found exceptions
>    when accessing the map, and treat them as serde exceptions.
>    5. You are correct in assuming that you can add the param to the
>    SystemConsumers, and have SamzaContainer set it.
>    6. I don't think the "add a null message" logic is the correct way to
>    handle your "queue empty" issue. This exception is being thrown because the
>    SystemConsumer's fetch map is getting out of sync with reality when we drop
>    a message. If you look at the poll() method, you'll see that we call
>    updateFetchMap(systemStreamPartition, -1), and then try to enqueue the
>    message. The updateFetchMap call is updating a count for the number of
>    messages in the SystemConsumers' buffers. If you then fail, the message
>    never actually goes into the buffer. You need to then go and update the
>    buffers AGAIN, this time incrementing the buffers by one, since the failed
>    message never actually made it into the buffer. Try
>    adding updateFetchMap(systemStreamPartition) to your catch clause, rather
>    than adding a null message. The logic would then be, "decrement the number
>    of messages we need to fetch by one since got a message and are adding it
>    to our buffer; add the message; if the message fails to add, increment the
>    number of messages we need to fetch by one, since we never actually added
>    our failed message."
>
> Cheers,
> Chris
>
>   From: Danny Antonetti <[email protected]>
> Date: Thursday, February 6, 2014 5:49 PM
> To: "[email protected]" <[email protected]>,
> Chris Riccomini <[email protected]>
> Subject: Re: JsonSerde error handling
>
>   Hey Chris
>
>  This is my first stab at a diff for this.
>
>  I an new to scala and samza, and I am not clear on the best practices
> here.
>
>
>  I used an enumeration for the flag, but I was not sure if that is what
> you wanted or just a boolean.  I have a metric that is incremented, and
> support Drop, Fail, and Log.
>
>  I could not find a good place to configure this flag, because there is
> nothing else for this class that is configured from the property file (that
> I could tell).
>
>  I was assuming that I wanted to add the flag to the constructor which is
> called in SamzaContainer.scala?
>
>
>  I had to add an enqueue with a blank Envelope or I was getting the
> following Exception.
> So since I have to enqueue anyway (Or am I missing something)
>
>  Since I have to a new Envelope I am not sure what to put for the
> Key/Message.
>
>  Exception in thread "main" java.util.NoSuchElementException: queue empty
>       at scala.collection.mutable.Queue.dequeue(Queue.scala:47)
>       at 
> org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:240)
>       at 
> org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:237)
>       at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
>       at 
> org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:237)
>       at 
> org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:226)
>       at 
> org.apache.samza.container.SamzaContainer.process(SamzaContainer.scala:556)
>       at 
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:457)
>       at 
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
>       at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
>
>
>
>
> On Wed, Feb 5, 2014 at 6:33 PM, Chris Riccomini 
> <[email protected]>wrote:
>
>> Hey Danny,
>>
>> Yep, pretty much.
>>
>> We should also have a counter metric to measure how many messages were
>> dropped, and a config setting to toggle the dropping behavior.
>>
>> Logging is a little tricky. I'm kind of sensitive to spewing a whole bunch
>> of "we just dropped your message from system stream partition X" log lines
>> in the container logs. It's conceivable that folks might have a ton of bad
>> messages, and logging every one would slow down the processing, and also
>> fill the disk. Alternatively, it'd be really useful to know that messages
>> had been dropped. The best I can come up with is to set it at DEBUG, so
>> it's not verbose, but still there if people want it.
>>
>> Cheers,
>> Chris
>>
>> On 2/5/14 6:13 PM, "Danny Antonetti" <[email protected]> wrote:
>>
>> >So it sounds like this would just be a try catch around this line
>> >in SystemConsumers.scala
>> >
>>
>> >unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManage
>> >r.fromBytes(envelope))
>> >
>> >And just do some logging in the catch statement?
>> >
>> >
>> >On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini
>> ><[email protected]>wrote:
>> >
>> >> Hey Guys,
>> >>
>> >> Thinking on this more. Given that we don't have a good grasp on this,
>> >>and
>> >> it seems fairly complicated for now, I'm proposing that we hold off on
>> >>all
>> >> this complex work, and just support the ability to drop messages for
>> >>now.
>> >> This should be pretty straightforward to implement, and is generally
>> >> pretty useful, and not too invasive.
>> >>
>> >> Does that sound cool with folks? Danny?
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote:
>> >>
>> >> >Hey Martin,
>> >> >
>> >> >I agree with you about ordering. It would be ideal to give the error()
>> >> >callback exactly when we would normally call process() on the message,
>> >>but
>> >> >can't because of the serde error. I think this is do-able if we tweak
>> >> >things in SystemConsumers a bit.
>> >> >
>> >> >The problem I'm having trouble resolving is how to handle the
>> >> >MessageChooser. The MessageChooser chooses the processing order
>> between
>> >> >partitions ("I have a message from stream/partition X, and
>> >> >stream/partition Y, which one do I process next?"). These choosers
>> >> >occasionally look at the actual message payload (e.g. The timestamp
>> >>field
>> >> >of an incoming message) when doing the choosing. The chooser won't
>> know
>> >> >how to handle an IncomingMessageEnvelope where the key and value are
>> >>both
>> >> >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I
>> >>can
>> >> >think of would be to add an error() call to the MessageChooser
>> >>interface.
>> >> >The other solution would be to just give the MessageChooser
>> everything,
>> >> >and in cases where it is looking at the message payload, it just has
>> >>to be
>> >> >careful. I think there might be other solutions here as well.
>> >> >
>> >> >More thought required.
>> >> >
>> >> >Cheers,
>> >> >Chris
>> >> >
>> >> >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]>
>> wrote:
>> >> >
>> >> >>Ok -- I had originally misunderstood your ErrorTask proposal. The way
>> >>you
>> >> >>described it looks good to me. I agree that a single error method,
>> >>which
>> >> >>takes a generic error envelope object, is better than lots of
>> >>different
>> >> >>methods.
>> >> >>
>> >> >>Rather than
>> >> >>
>> >> >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>> >> >>
>> >> >>could we just say this?
>> >> >>
>> >> >>if (envelope.getError() instanceof DeserializationError)
>> >> >>
>> >> >>Regarding ordering: my instinctive reaction is that it would be least
>> >> >>surprising if the error is received at the same point in the message
>> >> >>sequence as the message would have been received, had it not been an
>> >> >>error. But that may need some more thinking.
>> >> >>
>> >> >>Cheers,
>> >> >>Martin
>> >> >>
>> >> >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]>
>> >> wrote:
>> >> >>> Hey Guys,
>> >> >>>
>> >> >>> One other thing to consider here, that didn't strike me at first is
>> >>how
>> >> >>>to
>> >> >>> handle ordering when a serialization error occurs.
>> >> >>>
>> >> >>> Right now, the SystemConsumers class reads messages in, buffers
>> >>them,
>> >> >>>and
>> >> >>> feeds them slowly to the MessageChooser. The MessageChooser takes
>> >>only
>> >> >>> IncomingMessageEnvelope right now. So the question is, if we have a
>> >> >>>serde
>> >> >>> error, when does the StreamTask see it? It shouldn't see it before
>> >> >>>other
>> >> >>> messages for the same stream partition, since this would mean we've
>> >> >>>broken
>> >> >>> the ordering of the messages. Arguably, the MessageChooser should
>> >>also
>> >> >>>get
>> >> >>> a chance to see it, to choose in what order to process it.
>> >> >>>
>> >> >>> One could make the argument, though, that a message that can't be
>> >> >>> deserialized is invalid, and you're looking at potential data loss
>> >> >>>anyway,
>> >> >>> so ordering doesn't matter. I'm not sure if this assumption is true
>> >>in
>> >> >>>all
>> >> >>> cases though.
>> >> >>>
>> >> >>> I don't really have a good answer for this at the moment. I think
>> we
>> >> >>> should think about it.
>> >> >>>
>> >> >>> Cheers,
>> >> >>> Chris
>> >> >>>
>> >> >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]>
>> >> >>>wrote:
>> >> >>>
>> >> >>>> OK Great, that looks good.
>> >> >>>>
>> >> >>>> I will look into this as I have time and get back to you with
>> >> >>>>questions.
>> >> >>>>
>> >> >>>>
>> >> >>>> Thanks for the help.
>> >> >>>>
>> >> >>>>
>> >> >>>> Danny
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
>> >> >>>> <[email protected]>wrote:
>> >> >>>>
>> >> >>>>> Hey Guys,
>> >> >>>>>
>> >> >>>>> Here's a pseudo-code proposal for an error API.
>> >> >>>>>
>> >> >>>>> interface ErrorTask {
>> >> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>> >> >>>>>collector,
>> >> >>>>> TaskCoordinator coordinator);
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>> interface ErrorEnvelope {
>> >> >>>>>  ErrorCode getErrorCode();
>> >> >>>>>  Object getError();
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>> class DeserializationError {
>> >> >>>>>  byte[] getKey();
>> >> >>>>>
>> >> >>>>>  byte[] getMessage();
>> >> >>>>>  boolean keyFailed();
>> >> >>>>>  boolean valueFailed();
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>> Then you could implement something like:
>> >> >>>>>
>> >> >>>>> class MyStreamTask implements StreamTask, ErrorTask {
>> >> >>>>>  public void proces(Š) {
>> >> >>>>>    // do stuff
>> >> >>>>>  }
>> >> >>>>>
>> >> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>> >> >>>>>collector,
>> >> >>>>> TaskCoordinator coordinator) {
>> >> >>>>>    if
>> >> >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>> >> >>>>> {
>> >> >>>>>      DeserializationError error = (DeserializationError)
>> >> >>>>> envelope.getError();
>> >> >>>>>      collector.send("kafka", "bad-data", error.getKey(),
>> >> >>>>> error.getValue());
>> >> >>>>>    }
>> >> >>>>>  }
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>> Arguably, we've just moved our big if-statement block from the
>> >> >>>>>process()
>> >> >>>>> method into the ErrorTask.error method, but the alternative, as I
>> >>see
>> >> >>>>> it,
>> >> >>>>> is to have on call back per error message. This moves us into a
>> >>state
>> >> >>>>> where every time we want to add a new callback, we break all
>> >>existing
>> >> >>>>> implementations (since they must now implement the new method, as
>> >> >>>>> well). I
>> >> >>>>> chose the single callback and generic object approach because it
>> >> >>>>>matches
>> >> >>>>> what our process method does when multiple input streams are
>> >>defined
>> >> >>>>>(if
>> >> >>>>> envelope is from stream A, do X, if envelope is from stream B, do
>> >>Y),
>> >> >>>>> but
>> >> >>>>> I'm open to suggestions if people have them.
>> >> >>>>>
>> >> >>>>> Regarding your JSON use case, yes, I think you'd be given the raw
>> >> >>>>>bytes
>> >> >>>>> for the key and message, and it'd be up to you to decide what to
>> >>do
>> >> >>>>>with
>> >> >>>>> them. Samza allows you to define systems with no serde. Systems
>> >> >>>>>without
>> >> >>>>> a
>> >> >>>>> serde default to byte[] for both key and value, which means that
>> >>you
>> >> >>>>> could
>> >> >>>>> define a system (or stream) with a pass-through serde, and send
>> it
>> >> >>>>>the
>> >> >>>>> raw
>> >> >>>>> bytes for the key and message. Alternatively, you could try
>> >>decoding
>> >> >>>>>the
>> >> >>>>> data using some other serde, posting the message, to a DB,
>> logging
>> >> >>>>>the
>> >> >>>>> message in Log4j, discarding the message, etc.
>> >> >>>>>
>> >> >>>>> Cheers,
>> >> >>>>> Chris
>> >> >>>>>
>> >> >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]
>> >
>> >> >>>>>wrote:
>> >> >>>>>
>> >> >>>>>> Hey Chris,
>> >> >>>>>>
>> >> >>>>>> I am not sure I understand your suggestion about the ErrorTask.
>> >> >>>>>>What
>> >> >>>>>> would
>> >> >>>>>> this new functions method signature be?  I would assume it would
>> >> >>>>>>take
>> >> >>>>> in
>> >> >>>>>> the byte[] from the fromBytes function.  It seems like that ties
>> >>the
>> >> >>>>> Serde
>> >> >>>>>> implementation to the StreamTask implementation.  Unless you are
>> >> >>>>>> suggesting
>> >> >>>>>> that it should be notified without giving the input byte[].
>> >> >>>>>>
>> >> >>>>>> In our case we are using the JsonSerde, so the byte[] for the
>> >>json
>> >> >>>>>>data
>> >> >>>>>> would be given to  onDeserializationError and then our task
>> would
>> >> >>>>>>have
>> >> >>>>> to
>> >> >>>>>> decode the bytes?
>> >> >>>>>>
>> >> >>>>>> Just to clarify my suggestion, I was not thinking that we would
>> >>have
>> >> >>>>>>a
>> >> >>>>>> predefined set of behaviors.  I was thinking that I would have
>> an
>> >> >>>>>> interface
>> >> >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler.
>> >> >>>>>>With
>> >> >>>>> this
>> >> >>>>>> option there would be implementations of that interface for
>> >>Dropping
>> >> >>>>> the
>> >> >>>>>> message, redirect to a new Queue, or drop it.  But this would
>> >> >>>>>>require
>> >> >>>>>> extra
>> >> >>>>>> configuration as you mentioned.
>> >> >>>>>>
>> >> >>>>>> I am not invested in my approach, I just wanted to make sure
>> >>that I
>> >> >>>>>> understand the suggestions/options.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> Thanks
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> Danny
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
>> >> >>>>>> <[email protected]>wrote:
>> >> >>>>>>
>> >> >>>>>>> Hey Danny,
>> >> >>>>>>>
>> >> >>>>>>> I can think of two ways to accomplish this.
>> >> >>>>>>>
>> >> >>>>>>> The first way is essentially what you've described. Allow the
>> >> >>>>> framework
>> >> >>>>>>> to
>> >> >>>>>>> have a pre-defined set of options defined via config (drop the
>> >> >>>>> message,
>> >> >>>>>>> re-route to another topic, etc).
>> >> >>>>>>>
>> >> >>>>>>> The second way is to catch the serialization issues, and still
>> >>pass
>> >> >>>>> the
>> >> >>>>>>> failed message to the StreamTask. The way in which the
>> >>StreamTask
>> >> >>>>> would
>> >> >>>>>>> be
>> >> >>>>>>> notified of the failure is up for debate. One option would be
>> to
>> >> >>>>> have a
>> >> >>>>>>> ErrorTask interface, which has an onDeserializationError call
>> >>back.
>> >> >>>>> No
>> >> >>>>>>> new
>> >> >>>>>>> configuration would be required in this case--simply
>> >>implementing
>> >> >>>>>>>the
>> >> >>>>>>> ErrorTask means you get notified of any serialization errors
>> >> >>>>>>>(rather
>> >> >>>>>>> than
>> >> >>>>>>> failing the container, which would be the default). Another
>> >>option
>> >> >>>>> would
>> >> >>>>>>> be to have the IncomingMessageEnvelope have an error flag,
>> >>which we
>> >> >>>>>>> could
>> >> >>>>>>> use to denote the serialization failure.
>> >> >>>>>>>
>> >> >>>>>>> I like the second approach because it's more generic. Instead
>> of
>> >> >>>>>>> pre-defining exact behavior when a failure occurs, it seems
>> more
>> >> >>>>>>> flexible
>> >> >>>>>>> to let the StreamTask know, and let the developer decide what
>> >>the
>> >> >>>>>>> appropriate action is. I hadn't even thought of the re-route
>> >>case
>> >> >>>>>>>you
>> >> >>>>>>> brought up, and I'm sure there are many other possible actions
>> >>that
>> >> >>>>>>> we're
>> >> >>>>>>> not thinking of right now. Of the second approach's potential
>> >> >>>>>>> implementation options, I favor the ErrorTask approach right
>> >>now,
>> >> >>>>> but I
>> >> >>>>>>> haven't dug into it too much.
>> >> >>>>>>>
>> >> >>>>>>> Regardless of which way we choose, I think the default should
>> >>be to
>> >> >>>>> fail
>> >> >>>>>>> the container. This is the safest behavior, as it means there
>> >>will
>> >> >>>>> be no
>> >> >>>>>>> data loss, and developers will be alerted of the serialization
>> >> >>>>>>>error.
>> >> >>>>>>>
>> >> >>>>>>> As far as implementation goes, The four classes that are
>> >>probably
>> >> >>>>>>>of
>> >> >>>>>>> most
>> >> >>>>>>> interest to you are SamzaContainer, TaskInstance,
>> >>StreamConsumers,
>> >> >>>>> and
>> >> >>>>>>> SerdeManager. You'll need to catch the serde exceptions
>> >>somewhere
>> >> >>>>>>>in
>> >> >>>>> the
>> >> >>>>>>> StreamConsumer or SerdeManager class, and implement your new
>> >>logic
>> >> >>>>>>> there.
>> >> >>>>>>> I think the best approach is to read over these four classes,
>> >>and
>> >> >>>>> ask us
>> >> >>>>>>> any questions you might have.
>> >> >>>>>>>
>> >> >>>>>>> Cheers,
>> >> >>>>>>> Chris
>> >> >>>>>>>
>> >> >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <
>> [email protected]>
>> >> >>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>>> We have discussed this, and it is something that we want to
>> >>look
>> >> >>>>> into.
>> >> >>>>>>>>
>> >> >>>>>>>> Do you have any thoughts on how to implement this feature?
>> >> >>>>>>>> I assume you would want the failure behavior to be
>> >>configurable.
>> >> >>>>>>>>
>> >> >>>>>>>> Like
>> >> >>>>>>>> Drop the message,
>> >> >>>>>>>> Send a message to a new queue, and drop.
>> >> >>>>>>>> Fail the container (is that ever appropriate?)
>> >> >>>>>>>> Anything else?
>> >> >>>>>>>>
>> >> >>>>>>>> I am not familiar with this code base.
>> >> >>>>>>>> Do you have a suggestion on what classes I should be looking
>> to
>> >> >>>>> modify?
>> >> >>>>>>>>
>> >> >>>>>>>> Is there someone who I should bounce ideas off of?
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> Thanks
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> Danny
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan
>> >><[email protected]>
>> >> >>>>> wrote:
>> >> >>>>>>>>
>> >> >>>>>>>>> It's not intentional, error handling just hasn't been added
>> >>yet.
>> >> >>>>> If
>> >> >>>>>>>>> you're
>> >> >>>>>>>>> interested, we'd love to have the contribution.  In
>> >>particular,
>> >> >>>>> take
>> >> >>>>>>> a
>> >> >>>>>>>>> look
>> >> >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59
>> ),
>> >> >>>>> which
>> >> >>>>>>> also
>> >> >>>>>>>>> touches on how serdes should handle error conditions.
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>> Jakob
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>> >> >>>>>>>>> <[email protected]>wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes
>> >>for
>> >> >>>>>>>>> serializing
>> >> >>>>>>>>>> kafka messages.
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> I have noticed that if there is bad json input coming in
>> >>through
>> >> >>>>>>>>> kafka,
>> >> >>>>>>>>> the
>> >> >>>>>>>>>> samza container seems to crash.
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to
>> have
>> >> >>>>> any
>> >> >>>>>>>>> error
>> >> >>>>>>>>>> handling.
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> So I was curious if this was intentional?
>> >> >>>>>>>>>> Or if there is a different way to handle these types of
>> input
>> >> >>>>>>> errors?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Thanks
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Danny
>> >> >>>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>
>> >> >>
>> >> >
>> >>
>> >>
>>
>>
>

Reply via email to