Hi Zhao, Do you see any other errors regarding checkpoint file? Is this
reproducible by you and if you can you enable debug log level to get more
info.​

On Thu, Aug 20, 2015 at 7:44 AM, Zhao Weinan <zhaow...@gmail.com> wrote:

> Hi Kishore Senji,
>
> I've been busy recovering some data these two days... and found that I
> maybe hit more serious problem than I thought. I lost almost all data on
> one broker at least at some time, here is some log from server.log pasted
> below, and very like the situation described by Jason and Thunder here.
> <
> http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCAA%2BBczTUBqg1-tpcUjwfZgZYZyOXC-Myuhd_2EaGkeKWkrCVUQ%40mail.gmail.com%3E
> >
>
> Any idea about this? Why Kafka got segments with no-zero LEO then said No
> checkpointed highwatermark?
>
> From broker's logs/server.log:
>
> > [2015-08-16 17:14:35,204] INFO Completed load of log xxxxxx-1 with log
> end
> > offset 863967227 (kafka.log.Log)
> > [2015-08-16 17:15:29,648] WARN Partition [xxxxxx,1] on broker 1: No
> > checkpointed highwatermark is found for partition [xxxxxx,1]
> > (kafka.cluster.Partition)
> > [2015-08-16 17:15:36,887] INFO Truncating log xxxxxx-1 to offset 0.
> > (kafka.log.Log)
> > [2015-08-16 17:15:36,887] INFO Scheduling log segment 763102206 for log
> > xxxxxx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 768984638 for log
> > xxxxxx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 773712058 for log
> > xxxxxx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 778002546 for log
> > xxxxxx-1 for deletion. (kafka.log.Log)
> > .....
> > [2015-08-16 17:34:18,168] INFO Scheduling log segment 0 for log xxxxxx-1
> > for deletion. (kafka.log.Log)
> > [2015-08-16 17:36:37,811] WARN [ReplicaFetcherThread-0-0], Replica 1 for
> > partition [xxxxxx,1] reset its fetch offset from 791913697 to current
> > leader 0's start offset 791913697 (kafka.server.ReplicaFetcherThread)
> > [2015-08-16 17:36:37,811] ERROR [ReplicaFetcherThread-0-0], Current
> offset
> > 0 for partition [xxxxxx,1] out of range; reset offset to 791913697
> > (kafka.server.ReplicaFetcherThread)
> >
> > From broker's logs/controller.log:
>
> >  [2015-08-16 17:14:41,444] INFO [Controller 1]: Controller starting up
> > (kafka.controller.KafkaController)
> > [2015-08-16 17:14:41,492] INFO [Controller 1]: Controller startup
> complete
> > (kafka.controller.KafkaController)
> > [2015-08-16 17:16:24,850] INFO [SessionExpirationListener on 1], ZK
> > expired; shut down all controller components and try to re-elect
> > (kafka.controller.KafkaController$SessionExpirationListener)
> >
>
> 2015-08-18 23:43 GMT+08:00 Kishore Senji <kse...@gmail.com>:
>
> > Yes you are right. I misread the code. So the only thing that can explain
> > the behavior you are seeing is that may be there are many segments that
> > need to be deleted all at once. Can you try may be reducing the
> > retention.ms
> > in smaller intervals - like reduce it to 9 days from 10 days and see if
> the
> > brokers are fine.
> >
> > On Tue, Aug 18, 2015 at 12:48 AM Zhao Weinan <zhaow...@gmail.com> wrote:
> >
> > > Hi Kishore Senji,
> > >
> > > Did you constantly send messages to your test topic? Or just one time
> > send?
> > > I've just did some test, the log.lastModified is updated with every
> > message
> > > received (or every flush to disk at least). So I think if your interval
> > > between two neibouring messages is never smaller than retention.ms,
> then
> > > your current segments should never be deleted.
> > >
> > > The wired thing is the offset of one partition in my data-loss-topic
> > became
> > > to be zero, while offsets in other partition are normally minimum...
> > >
> > > 2015-08-18 14:45 GMT+08:00 Kishore Senji <kse...@gmail.com>:
> > >
> > > > It is log.deleteOldSegments(startMs - _.lastModified >
> > > > log.config.retentionMs)
> > > >
> > > > You might have missed the startMs.
> > > >
> > > > I have tested it myself. I created a test topic with retention.ms
> > equal
> > > to
> > > > 20 minutes and added some messages. Later I changed the retention.ms
> > to
> > > 2
> > > > min. I can see whenever the delete thread runs (every five min), it
> > > deletes
> > > > even the latest Segment because that Segment age is older than
> > > > retention.ms
> > > >
> > > >
> > > > On Mon, Aug 17, 2015 at 11:30 PM, Zhao Weinan <zhaow...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Kishore Senji,
> > > > >
> > > > > The size of segement file is default 1GB.
> > > > >
> > > > > According to the LogManager.scala#cleanupExpiredSegments, Kafka
> will
> > > only
> > > > > delete segments whose lastModTime is older than retention.ms, so I
> > > dont
> > > > > think this is the reason for my data loss. Actually I lost some
> data
> > in
> > > > > topic other than the topic I reduced the retention...
> > > > >
> > > > > I dont know whether destage these several GB files will cause this
> > kind
> > > > of
> > > > > system chattering, though we do use not very fancy hardwares.
> > > > >
> > > > > 2015-08-18 7:48 GMT+08:00 Kishore Senji <kse...@gmail.com>:
> > > > >
> > > > > > What is the size of the segment file? You are reducing the
> > retention
> > > > from
> > > > > > 10 days to 1 day. The moment you do this, it will delete all
> > segments
> > > > > which
> > > > > > are older than 1 day. So for example, if your latest segment is
> > older
> > > > > than
> > > > > > 1 day and if there are consumers which are still catching up (let
> > us
> > > > say
> > > > > 10
> > > > > > min lag), Kafka will roll over and delete the older segments and
> > > there
> > > > is
> > > > > > potential for data loss. One pattern could be to make sure you
> > change
> > > > > this
> > > > > > config parameter only when a new segment is created and all
> > consumers
> > > > are
> > > > > > on the new segment and also make sure all clients will be done
> with
> > > the
> > > > > > segment before the file is deleted.
> > > > > >
> > > > > > My guess is that your segment file is huge and the OS may be
> > taking a
> > > > > long
> > > > > > time to destage the file cache on to disk before letting it to be
> > > > > deleted.
> > > > > > This could be the reason for long pause which could be causing
> the
> > Zk
> > > > > > connections to be timed out.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan <zhaow...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Kishore Senji,
> > > > > > >
> > > > > > > Thanks for the reply.
> > > > > > >
> > > > > > > Do you have some suggestions before the fix came up? Try not to
> > > > modify
> > > > > > the
> > > > > > > retention.ms? Or disable the auto rebalance? Cause this
> problem
> > is
> > > > > 100%
> > > > > > > reproduceable in my scenario (two times got dead lock in two
> > > > > > retention.ms
> > > > > > > modification), and I even found some data loss which I'm still
> > > > looking
> > > > > > for
> > > > > > > the reason.
> > > > > > >
> > > > > > > Any idea on why shrinking the retention.ms causes the network
> > > > > unstable?
> > > > > > >
> > > > > > > And yes I use the comma for clarity :)
> > > > > > >
> > > > > > > 2015-08-17 8:59 GMT+08:00 Kishore Senji <kse...@gmail.com>:
> > > > > > >
> > > > > > > > Interesting problem you ran in to. It seems like this broker
> > was
> > > > > chosen
> > > > > > > as
> > > > > > > > the Controller and onControllerFailure() method was called.
> > This
> > > > will
> > > > > > > > schedule the checkAndTriggerPartitionRebalance method to
> > execute
> > > > > after
> > > > > > 5
> > > > > > > > seconds (when auto rebalance enabled). In the mean time this
> > > broker
> > > > > > lost
> > > > > > > > zookeeper connection and so this broker resigns from the
> > > Controller
> > > > > > > status
> > > > > > > > and so the onControllerResignation() method is called and
> this
> > > > method
> > > > > > > will
> > > > > > > > try to shutdown the auto rebalance executor.  But it is doing
> > it
> > > by
> > > > > > > holding
> > > > > > > > the lock and this is what caused the dead lock in your case.
> > > > > > > >
> > > > > > > > I do not think we need to hold the lock to shutdown the
> > executor.
> > > > > This
> > > > > > > > could be the fix we might need.
> > > > > > > >
> > > > > > > > retention.ms config parameter should not have commas in the
> > > value.
> > > > > Are
> > > > > > > you
> > > > > > > > just using it here to clarify for us.
> > > > > > > >
> > > > > > > > It so happened in your
> > > > > > > > On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan <
> > zhaow...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi guys,
> > > > > > > > >
> > > > > > > > > I got this problem, after changing one topic's config to
> > > > > > > > > retention.ms=86,400,000
> > > > > > > > > from 864,000,000, the brokers start to shedule and do
> > deletions
> > > > of
> > > > > > > > outdated
> > > > > > > > > index of that topic.
> > > > > > > > >
> > > > > > > > > Then for some reason some brokers' connection with
> zookeeper
> > > were
> > > > > > > > expired,
> > > > > > > > > suddenly lots of ERRORs showed up in logs/server.log: At
> > > > controller
> > > > > > > > > broker(id=5) are:
> > > > > > > > >
> > > > > > > > > > *ERROR [ReplicaFetcherThread-2-4], Error for partition
> > > > [XXXXX,4]
> > > > > to
> > > > > > > > > broker
> > > > > > > > > > 4:class kafka.common.NotLeaderForPartitionException
> > > > > > > > > > (kafka.server.ReplicaFetcherThread).*
> > > > > > > > > >
> > > > > > > > > At other broker which the controller broker try to fetch
> are:
> > > > > > > > >
> > > > > > > > > > *[Replica Manager on Broker 4]: Fetch request with
> > > correlation
> > > > id
> > > > > > > > 1920630
> > > > > > > > > > from client ReplicaFetcherThread-2-4 on partition
> [XXXXX,4]
> > > > > failed
> > > > > > > due
> > > > > > > > to
> > > > > > > > > > Leader not local for partition [XXXXXXX,4] on broker 4
> > > > > > > > > > (kafka.server.ReplicaManager).*
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > In controller broker's server.log there are zk
> reconnections:
> > > > > > > > >
> > > > > > > > > > *INFO Client session timed out, have not heard from
> server
> > in
> > > > > > 5126ms
> > > > > > > > for
> > > > > > > > > > sessionid 0x54e0aaa9582b8e4, closing socket connection
> and
> > > > > > attempting
> > > > > > > > > > reconnect (org.apache.zookeeper.ClientCnxn)*
> > > > > > > > > > *INFO zookeeper state changed (Disconnected)
> > > > > > > > > > (org.I0Itec.zkclient.ZkClient)*
> > > > > > > > > > *NFO Opening socket connection to server xxxxxxxxxxx.
> Will
> > > not
> > > > > > > attempt
> > > > > > > > to
> > > > > > > > > > authenticate using SASL (java.lang.SecurityException:
> > Unable
> > > to
> > > > > > > locate
> > > > > > > > a
> > > > > > > > > > login configuration) (org.apache.zookeeper.ClientCnxn)*
> > > > > > > > > > *INFO Socket connection established to xxxxxxxxxxxxx,
> > > > initiating
> > > > > > > > session
> > > > > > > > > > (org.apache.zookeeper.ClientCnxn)*
> > > > > > > > > > *INFO Session establishment complete on server
> xxxxxxxxxx,
> > > > > > sessionid
> > > > > > > =
> > > > > > > > > > 0x54e0aaa9582b8e4, negotiated timeout = 6000
> > > > > > > > > > (org.apache.zookeeper.ClientCnxn)*
> > > > > > > > > > *INFO zookeeper state changed (SyncConnected)
> > > > > > > > > > (org.I0Itec.zkclient.ZkClient)*
> > > > > > > > > >
> > > > > > > > > But on zookeeper /brokers/ids/ there is no controller
> > broker's
> > > id
> > > > > 5.
> > > > > > > > >
> > > > > > > > > Then I tried to restart the controller broker, found the
> > > process
> > > > > > won't
> > > > > > > > > quit.
> > > > > > > > >
> > > > > > > > > Then I jstacked it, found the broker process kind of
> stucked,
> > > > some
> > > > > > > > > keypoints pasted as below. It seems
> > zk-client-expired-callback
> > > > > > aquired
> > > > > > > > the
> > > > > > > > > controllerLock and wait the kafka-scheduler Executor to
> exit
> > > (for
> > > > > one
> > > > > > > > day),
> > > > > > > > > but some thread in that Executor is doing Rebalance job
> which
> > > > need
> > > > > to
> > > > > > > > > aquire the controllerLock, then the broker is in DEAD LOCK
> > and
> > > > will
> > > > > > be
> > > > > > > > > totally lost from zookeeper for ONE DAY if I'm corrected?
> And
> > > > since
> > > > > > > it's
> > > > > > > > > still hold outdated view of the cluster, it will try to try
> > to
> > > > > > follower
> > > > > > > > up
> > > > > > > > > the Leaders which maybe not actual Leader, caused the
> ERRORs
> > as
> > > > > above
> > > > > > > > > mentioned.
> > > > > > > > >
> > > > > > > > > I'm using 8 Kafka brokers @0.8.2.1 with 3 Zookeeper server
> > > > @3.4.6,
> > > > > > all
> > > > > > > > on
> > > > > > > > > different host in same data center, the cluster load is
> about
> > > > 200K
> > > > > > > > messages
> > > > > > > > > in and 30M bytes in and 80M bytes out totally.
> > > > > > > > >
> > > > > > > > > Does some one has the same issue? Any suggestion is
> > > appreciated.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > jstack:
> > > > > > > > >
> > > > > > > > > > "kafka-scheduler-0" daemon prio=10 tid=0x0000000057967800
> > > > > > nid=0x2994
> > > > > > > > > > waiting on condition [0x0000000046dac000]
> > > > > > > > > >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > > >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > > >         - parking to wait for  <0x00000000c2ec6418> (a
> > > > > > > > > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > >
> > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> > > > > > > > > >         at kafka.utils.Utils$.inLock(Utils.scala:533)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1131)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1127)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > >
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1127)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
> > > > > > > > > >         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > >
> > > >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > > > > > > >         at
> > > > > > > > >
> > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > > > > > > >         at java.lang.Thread.run(Thread.java:662)
> > > > > > > > > >
> > > > > > > > > "ZkClient-EventThread-15-xxxxxxxxxxxxxxxxxxxxx" daemon
> > prio=10
> > > > > > > > > > tid=0x00002aaab42ba800 nid=0x24c4 waiting on condition [
> > > > > > > > > > 0x00000000402e8000]
> > > > > > > > > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > > > > > > > > >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > > >         - parking to wait for  <0x00000000f4c450b8> (a
> > > > > > > > > >
> > > > > > >
> > > >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > >
> > > >
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1253)
> > > > > > > > > >         at
> > > > > > > kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > > > > > > > > >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > > >         at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
> > > > > > > > > >         at
> > > > org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > > > > > > > > >         at
> > > > > > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to