Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread Pushkar Deole
Thanks John... appreciate your inputs and suggestions. I have been assigned
recently to this task (of persisting the cache) and haven't been involved
in original design and architecture and agree with all the issues you have
highlighted.
However, at this point, i don't think the application can be converted to
streams since the design is not flexible and it would require lot of
rewrite of code plus subsequent testing.

My first thought was to use external database only,  preferably the
distributed caching systems like Apache Ignite since it will have least
impact on performance. Going to database for every event would impact the
throughput a lot. Probably having distributed caching (key/value pairs)
would have comparatively lesser impact.
Second choice is to go for GlobalKTable however this needs to be done very
carefully.

Thanks again!

On Mon, May 4, 2020 at 11:18 PM Pushkar Deole  wrote:

> Thanks John... what parameters would affect the latency in case
> GlobalKTable will be used and is there any configurations that could be
> tuned to minimize the latency of sync with input topic?
>
> On Mon, May 4, 2020 at 10:20 PM John Roesler  wrote:
>
>> Hello Pushkar,
>>
>> Yes, that’s correct. The operation you describe is currently not
>> supported. If you want to keep the structure you described in place, I’d
>> suggest using an external database for the admin objects. I’ll give another
>> idea below.
>>
>> With your current architecture, I’m a little concerned about data races.
>> From what I saw, nothing would prevent processing stream records with agent
>> 10 before you process the admin record with agent 10. This problem will
>> persist no matter where you locate the cache.
>>
>> GlobalKTable would no doubt make it worse, since it increases the latency
>> before admin record 10 is queriable everywhere.
>>
>> I think you’ll want to make a call between architecture simplicity
>> (remote cache or global KTable) vs the probability of missed joins.
>>
>> I think the “best” way to solve this problem (that comes to mind anyway)
>> might be to
>> 1. Repartition the stream to be co-partitioned with the admin records.
>> 2. Do a local (not global) stream-table join
>> 3. Enable task idling
>>
>> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
>> agent Id the new key of the stream, and then use ‘through’, (where the
>> intermediate topic has the same number of partitions as the admin topic) to
>> do the repartitioning. In 2.6, there is a “repartition” operator that will
>> make this easier.
>>
>> The repartition ensures that all stream records with agent id 10 will be
>> processed by the same thread that processes the admin records with agent id
>> 10, hence it will be able to find agent 10 in the local KTable store.
>>
>> Task idling will minimize your chances of missing any enrichments. When a
>> task has two inputs (E.g., your repartitioned stream joining with the admin
>> table), it makes Streams wait until both inputs are buffered before
>> processing, so it can do a better job of processing in timestamp order.
>>
>> I hope this helps!
>> -John
>>
>> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
>> > If i understand correctly, Kafka is not designed to provide replicated
>> > caching mechanism wherein the updates to cache will be synchronous
>> across
>> > multiple cache instances.
>> >
>> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole 
>> wrote:
>> >
>> > > Thanks John.
>> > >
>> > > Actually, this is a normal consumer-producer application wherein
>> there are
>> > > 2 consumers (admin consumer and main consumer) consuming messages
>> from 2
>> > > different topics.
>> > > One of the consumers consumes messages from a admin topic and
>> populates
>> > > data in a cache e.g. lets say agent with agent id 10 for which the
>> first
>> > > name and last name is received is populated in cache. When the other
>> > > consumer consumes message and it has agent id 10 then it reads the
>> cache,
>> > > appends the first name and last name and then sends enriched event to
>> > > producer.
>> > > In this case, each application instance consumes all the events from
>> admin
>> > > topic (unique consumer id) and keeps them in the cache in memory.
>> > > Now the requirement is to persist the cache and make is shared
>> between the
>> > > application instances, so each instance would consume partitions of
>> admin
>> > > topic and write to admin cache.
>> > >
>> > > If we want to use kafka streams, the application is so much evolved
>> that
>> > > it is difficult to migrate to streams at this stage. Secondly, from
>> past
>> > > mail chains, streams also won't serve the requirement since local
>> state
>> > > stores would just hold the local state of admin data and the cache
>> written
>> > > by each instance won't be shared with other instances.
>> > >
>> > > Global state stores may help but again it requires writing to the
>> topic
>> > > which is then synced with the state stores in the instan

Re: FAILED tasks in connector status

2020-05-04 Thread Abdoulaye Diallo
Here is the log on the app side when the worker tries to start the failing
task.

{
  "timestamp": "2020-05-04T23:22:37,759Z",
  "level": "ERROR",
  "thread": "pool-7-thread-3",
  "logger": "org.apache.kafka.connect.runtime.Worker",
  "timestamp": "2020-05-04T16:22:37,759",
  "message": "Failed to start task MyTopic-75",
  "throwable": {
"class": "java.lang.NullPointerException",
"msg": null,
"stack": [

"org.apache.kafka.common.config.AbstractConfig.propsToMap(AbstractConfig.java:98)",

"org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:59)",

"org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:73)",

"org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:204)",

"org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)",
  "org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:414)",

"org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)",

"org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)",

"org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)",

"org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)",
  "java.util.concurrent.FutureTask.run(FutureTask.java:264)",

"java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)",

"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)",
  "java.lang.Thread.run(Thread.java:834)"]
  }
}


On Mon, May 4, 2020 at 7:01 PM Abdoulaye Diallo 
wrote:

> Hi community,
>
> I have a connect cluster deployed in a 'cloud-like' environment where an
> instance can die anytime but a new instance gets automatically re-spawn
> immediately after(within the min...). This obviously leads to an eager
> rebalance - I am on Kafka 2.1.1 for client and server.
>
> What happens after rebalance is completed is my connector is running full
> speed, all partitions are being consumed, no lag - But:
>
> - now some of my tasks are carrying an extra partition.
> - some of the tasks(the same number as above) fail with this error because
> of this error.
>
> {"id":28,"state":"FAILED","worker_id":"hostname.zyx.com:310563","trace":"java.lang.NullPointerException\n\tat
>  
> org.apache.kafka.common.config.AbstractConfig.propsToMap(AbstractConfig.java:98)\n\tat
>  
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:59)\n\tat
>  org.apache.kafka.connect.runtime.TaskConfig.(TaskConfig.java:51)\n\tat 
> org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:418)\n\tat 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat
>  
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat
>  
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat
>  
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat
>
> Basically, it looks like the partition reassignment has re-assigned those
> some partitions to 2 different workers.
>
> Any pointers as to what direction to look or what might have happened is
> appreciated.
>
> Kafka version: 2.1.1
>
> Thank you,
> --
> Abdoulaye Diallo
>


-- 
Abdoulaye Diallo


FAILED tasks in connector status

2020-05-04 Thread Abdoulaye Diallo
Hi community,

I have a connect cluster deployed in a 'cloud-like' environment where an
instance can die anytime but a new instance gets automatically re-spawn
immediately after(within the min...). This obviously leads to an eager
rebalance - I am on Kafka 2.1.1 for client and server.

What happens after rebalance is completed is my connector is running full
speed, all partitions are being consumed, no lag - But:

- now some of my tasks are carrying an extra partition.
- some of the tasks(the same number as above) fail with this error because
of this error.

{"id":28,"state":"FAILED","worker_id":"hostname.zyx.com:310563","trace":"java.lang.NullPointerException\n\tat
org.apache.kafka.common.config.AbstractConfig.propsToMap(AbstractConfig.java:98)\n\tat
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:59)\n\tat
org.apache.kafka.connect.runtime.TaskConfig.(TaskConfig.java:51)\n\tat
org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:418)\n\tat
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat

Basically, it looks like the partition reassignment has re-assigned those
some partitions to 2 different workers.

Any pointers as to what direction to look or what might have happened is
appreciated.

Kafka version: 2.1.1

Thank you,
-- 
Abdoulaye Diallo


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread Pushkar Deole
Thanks John... what parameters would affect the latency in case
GlobalKTable will be used and is there any configurations that could be
tuned to minimize the latency of sync with input topic?

On Mon, May 4, 2020 at 10:20 PM John Roesler  wrote:

> Hello Pushkar,
>
> Yes, that’s correct. The operation you describe is currently not
> supported. If you want to keep the structure you described in place, I’d
> suggest using an external database for the admin objects. I’ll give another
> idea below.
>
> With your current architecture, I’m a little concerned about data races.
> From what I saw, nothing would prevent processing stream records with agent
> 10 before you process the admin record with agent 10. This problem will
> persist no matter where you locate the cache.
>
> GlobalKTable would no doubt make it worse, since it increases the latency
> before admin record 10 is queriable everywhere.
>
> I think you’ll want to make a call between architecture simplicity (remote
> cache or global KTable) vs the probability of missed joins.
>
> I think the “best” way to solve this problem (that comes to mind anyway)
> might be to
> 1. Repartition the stream to be co-partitioned with the admin records.
> 2. Do a local (not global) stream-table join
> 3. Enable task idling
>
> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
> agent Id the new key of the stream, and then use ‘through’, (where the
> intermediate topic has the same number of partitions as the admin topic) to
> do the repartitioning. In 2.6, there is a “repartition” operator that will
> make this easier.
>
> The repartition ensures that all stream records with agent id 10 will be
> processed by the same thread that processes the admin records with agent id
> 10, hence it will be able to find agent 10 in the local KTable store.
>
> Task idling will minimize your chances of missing any enrichments. When a
> task has two inputs (E.g., your repartitioned stream joining with the admin
> table), it makes Streams wait until both inputs are buffered before
> processing, so it can do a better job of processing in timestamp order.
>
> I hope this helps!
> -John
>
> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > If i understand correctly, Kafka is not designed to provide replicated
> > caching mechanism wherein the updates to cache will be synchronous across
> > multiple cache instances.
> >
> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole 
> wrote:
> >
> > > Thanks John.
> > >
> > > Actually, this is a normal consumer-producer application wherein there
> are
> > > 2 consumers (admin consumer and main consumer) consuming messages from
> 2
> > > different topics.
> > > One of the consumers consumes messages from a admin topic and populates
> > > data in a cache e.g. lets say agent with agent id 10 for which the
> first
> > > name and last name is received is populated in cache. When the other
> > > consumer consumes message and it has agent id 10 then it reads the
> cache,
> > > appends the first name and last name and then sends enriched event to
> > > producer.
> > > In this case, each application instance consumes all the events from
> admin
> > > topic (unique consumer id) and keeps them in the cache in memory.
> > > Now the requirement is to persist the cache and make is shared between
> the
> > > application instances, so each instance would consume partitions of
> admin
> > > topic and write to admin cache.
> > >
> > > If we want to use kafka streams, the application is so much evolved
> that
> > > it is difficult to migrate to streams at this stage. Secondly, from
> past
> > > mail chains, streams also won't serve the requirement since local state
> > > stores would just hold the local state of admin data and the cache
> written
> > > by each instance won't be shared with other instances.
> > >
> > > Global state stores may help but again it requires writing to the topic
> > > which is then synced with the state stores in the instances and the
> > > instances may not be in sync with each.
> > > I am not sure if this would cause any inconsistencies since i don't
> know
> > > how the events would flow from source e.g. if admin data is consumed
> by one
> > > instance which then modified the topic but it is not yet synced to all
> the
> > > global state stores and the next event arrived on the main consumer on
> a
> > > different instance and it tried to read from store cache then it
> doesn't
> > > get the data, so the event passed on without enriched data.
> > > That's pretty much about the use case.
> > >
> > >
> > > On Sun, May 3, 2020 at 9:42 PM John Roesler 
> wrote:
> > >
> > >> Hi Pushkar,
> > >>
> > >> I’ve been wondering if we should add writable tables to the Streams
> api.
> > >> Can you explain more about your use case and how it would integrate
> with
> > >> your application?
> > >>
> > >> Incidentally, this would also help us provide more concrete advice.
> > >>
> > >> Thanks!
> > >> John
> > >>
> > >> On Fri, May

Re: Kafka: Messages disappearing from topics, largestTime=0

2020-05-04 Thread JP MB
Here are the startup logs from a deployment where we lost 15 messages in
topic-p:
https://gist.github.com/josebrandao13/81271140e59e28eda7aaa777d2d3b02c

.timeindex files state before the deployment:
*Partitions with messages: timestamp mismatch
*Partitions without messages: permission denied

.timeindex files state before the deployment:
*All partitions without messages: permission denied, new files were created.

Don't see anything particular in the logs but you can see that the messages
are deleted with largestTime=0 and are from today morning.

Em seg., 4 de mai. de 2020 às 11:37, JP MB 
escreveu:

> Hi guys,
>
> I'm gonna get back to this today, I get mixed feelings regarding the
> volumes being the cause. This volume switching is around for quite some
> time, in a lot of clusters, and we only started noticing this problem when
> we updated some of them. Also, this only happens in *a few* of those
> .timeindex files and not in all of them. The .log files or .index files
> which are also on the volumes don't also have the problem.
>
> Additionally, I'm a bit confused on what should be the initial state of
> those .timeindex files. Sometimes I see "found log offset: -1", others the
> timestamp mismatch error "Index timestamp: 0, log timestamp: 1588583643582"
> and sometimes something like this "Indexed offset: 0, found log offset: 28".
>
> So we have seen previously that whenever the timestamp mismatch error is
> present we lose messages. eventually after a deployment. Since this looks
> like the trigger for the problem I would like to understand how it can
> happen. So my question is, how can each of those different states of
> initialization for the .timeindexes happen? We can reproduce all of them
> when running Kafka locally.
>
> Meanwhile, I'm trying to reproduce the situation in our dev environments
> and get some startup logs and I will play with the log flush settings.
>
> Regards
>
> Em sáb., 2 de mai. de 2020 às 14:45, Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> escreveu:
>
>> Good luck JP, do try it with the volume switching commented out, and see
>> how it goes.
>>
>> On Fri, May 1, 2020 at 6:50 PM JP MB  wrote:
>>
>> > Thank you very much for the help anyway.
>> >
>> > Best regards
>> >
>> > On Fri, May 1, 2020, 00:54 Liam Clarke-Hutchinson <
>> > liam.cla...@adscale.co.nz>
>> > wrote:
>> >
>> > > So the logs show a healthy shutdown, so we can eliminate that as an
>> > issue.
>> > > I would look next at the volume management during a rollout based on
>> the
>> > > other error messages you had earlier about permission denied etc. It's
>> > > possible there's some journalled but not flushed changes in those time
>> > > indexes, but at this point we're getting into filesystem internals
>> which
>> > > aren't my forte. But if you can temporarily disable the volume
>> switching
>> > > and do a test roll out, see if you get the same problems or not, would
>> > help
>> > > eliminate it or confirm it.
>> > >
>> > > Sorry I can't help further on that.
>> > >
>> > > On Fri, May 1, 2020 at 5:34 AM JP MB 
>> wrote:
>> > >
>> > > > I took a bit because I needed logs of the server shutting down when
>> > this
>> > > > occurs. Here they are, I can see some errors:
>> > > >
>> https://gist.github.com/josebrandao13/e8b82469d3e9ad91fbf38cf139b5a726
>> > > >
>> > > > Regarding systemd, the closest I could find to TimeoutStopSec was
>> > > > DefaultTimeoutStopUSec=1min 30s that looks to be 90seconds. I could
>> not
>> > > > find any KillSignal or RestartKillSignal. You can see the output of
>> > > > systemctl show --all here:
>> > > >
>> https://gist.github.com/josebrandao13/f2dd646fab19b19f127981fce92d78c4
>> > > >
>> > > > Once again, thanks for the help.
>> > > >
>> > > > Em qui., 30 de abr. de 2020 às 15:04, Liam Clarke-Hutchinson <
>> > > > liam.cla...@adscale.co.nz> escreveu:
>> > > >
>> > > > > I'd also suggest eyeballing your systemd conf to verify that
>> someone
>> > > > hasn't
>> > > > > set a very low TimeoutStopSec, or that
>> KillSignal/RestartKillSignal
>> > > > haven't
>> > > > > been configured to SIGKILL (confusingly named, imo, as the default
>> > for
>> > > > > KillSignal is SIGTERM).
>> > > > >
>> > > > > Also, the Kafka broker logs at shutdown look very different if it
>> > shut
>> > > > down
>> > > > > currently vs if it didn't. Could you perhaps put them in a Gist
>> and
>> > > email
>> > > > > the link?
>> > > > >
>> > > > > Just trying to make sure basic assumptions are holding :)
>> > > > >
>> > > > > On Fri, 1 May 2020, 1:21 am JP MB, 
>> > wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > > It's quite a complex script generated with ansible where we use
>> a/b
>> > > > > > deployment and honestly, I don't have full knowledge on it I can
>> > > share
>> > > > > the
>> > > > > > general guidelines of what is done:
>> > > > > >
>> > > > > > > - Any old volumes (from previous releases are removed) (named
>> > with
>> > > > > suffix
>> > > > > > > '-old')
>> > > > > > > - Detach t

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread John Roesler
Hello Pushkar,

Yes, that’s correct. The operation you describe is currently not supported. If 
you want to keep the structure you described in place, I’d suggest using an 
external database for the admin objects. I’ll give another idea below.  

With your current architecture, I’m a little concerned about data races. From 
what I saw, nothing would prevent processing stream records with agent 10 
before you process the admin record with agent 10. This problem will persist no 
matter where you locate the cache.

GlobalKTable would no doubt make it worse, since it increases the latency 
before admin record 10 is queriable everywhere.

I think you’ll want to make a call between architecture simplicity (remote 
cache or global KTable) vs the probability of missed joins. 

I think the “best” way to solve this problem (that comes to mind anyway) might 
be to
1. Repartition the stream to be co-partitioned with the admin records.
2. Do a local (not global) stream-table join
3. Enable task idling

You can do the repartition today with a ‘map’ or ‘selectKey’ to make the agent 
Id the new key of the stream, and then use ‘through’, (where the intermediate 
topic has the same number of partitions as the admin topic) to do the 
repartitioning. In 2.6, there is a “repartition” operator that will make this 
easier. 

The repartition ensures that all stream records with agent id 10 will be 
processed by the same thread that processes the admin records with agent id 10, 
hence it will be able to find agent 10 in the local KTable store. 

Task idling will minimize your chances of missing any enrichments. When a task 
has two inputs (E.g., your repartitioned stream joining with the admin table), 
it makes Streams wait until both inputs are buffered before processing, so it 
can do a better job of processing in timestamp order. 

I hope this helps!
-John 

On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> If i understand correctly, Kafka is not designed to provide replicated
> caching mechanism wherein the updates to cache will be synchronous across
> multiple cache instances.
> 
> On Sun, May 3, 2020 at 10:49 PM Pushkar Deole  wrote:
> 
> > Thanks John.
> >
> > Actually, this is a normal consumer-producer application wherein there are
> > 2 consumers (admin consumer and main consumer) consuming messages from 2
> > different topics.
> > One of the consumers consumes messages from a admin topic and populates
> > data in a cache e.g. lets say agent with agent id 10 for which the first
> > name and last name is received is populated in cache. When the other
> > consumer consumes message and it has agent id 10 then it reads the cache,
> > appends the first name and last name and then sends enriched event to
> > producer.
> > In this case, each application instance consumes all the events from admin
> > topic (unique consumer id) and keeps them in the cache in memory.
> > Now the requirement is to persist the cache and make is shared between the
> > application instances, so each instance would consume partitions of admin
> > topic and write to admin cache.
> >
> > If we want to use kafka streams, the application is so much evolved that
> > it is difficult to migrate to streams at this stage. Secondly, from past
> > mail chains, streams also won't serve the requirement since local state
> > stores would just hold the local state of admin data and the cache written
> > by each instance won't be shared with other instances.
> >
> > Global state stores may help but again it requires writing to the topic
> > which is then synced with the state stores in the instances and the
> > instances may not be in sync with each.
> > I am not sure if this would cause any inconsistencies since i don't know
> > how the events would flow from source e.g. if admin data is consumed by one
> > instance which then modified the topic but it is not yet synced to all the
> > global state stores and the next event arrived on the main consumer on a
> > different instance and it tried to read from store cache then it doesn't
> > get the data, so the event passed on without enriched data.
> > That's pretty much about the use case.
> >
> >
> > On Sun, May 3, 2020 at 9:42 PM John Roesler  wrote:
> >
> >> Hi Pushkar,
> >>
> >> I’ve been wondering if we should add writable tables to the Streams api.
> >> Can you explain more about your use case and how it would integrate with
> >> your application?
> >>
> >> Incidentally, this would also help us provide more concrete advice.
> >>
> >> Thanks!
> >> John
> >>
> >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> >> > Both stores sever a different purpose.
> >> >
> >> > Regular stores allow you to store state the application computes.
> >> > Writing into the changelog is a fault-tolerance mechanism.
> >> >
> >> > Global store hold "axially" data that is provided from "outside" of the
> >> > app. There is no changelog topic, but only the input topic (that is used
> >> > to re-create the global state).
> >> 

Re: Kafka: Messages disappearing from topics, largestTime=0

2020-05-04 Thread JP MB
Hi guys,

I'm gonna get back to this today, I get mixed feelings regarding the
volumes being the cause. This volume switching is around for quite some
time, in a lot of clusters, and we only started noticing this problem when
we updated some of them. Also, this only happens in *a few* of those
.timeindex files and not in all of them. The .log files or .index files
which are also on the volumes don't also have the problem.

Additionally, I'm a bit confused on what should be the initial state of
those .timeindex files. Sometimes I see "found log offset: -1", others the
timestamp mismatch error "Index timestamp: 0, log timestamp: 1588583643582"
and sometimes something like this "Indexed offset: 0, found log offset: 28".

So we have seen previously that whenever the timestamp mismatch error is
present we lose messages. eventually after a deployment. Since this looks
like the trigger for the problem I would like to understand how it can
happen. So my question is, how can each of those different states of
initialization for the .timeindexes happen? We can reproduce all of them
when running Kafka locally.

Meanwhile, I'm trying to reproduce the situation in our dev environments
and get some startup logs and I will play with the log flush settings.

Regards

Em sáb., 2 de mai. de 2020 às 14:45, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> escreveu:

> Good luck JP, do try it with the volume switching commented out, and see
> how it goes.
>
> On Fri, May 1, 2020 at 6:50 PM JP MB  wrote:
>
> > Thank you very much for the help anyway.
> >
> > Best regards
> >
> > On Fri, May 1, 2020, 00:54 Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz>
> > wrote:
> >
> > > So the logs show a healthy shutdown, so we can eliminate that as an
> > issue.
> > > I would look next at the volume management during a rollout based on
> the
> > > other error messages you had earlier about permission denied etc. It's
> > > possible there's some journalled but not flushed changes in those time
> > > indexes, but at this point we're getting into filesystem internals
> which
> > > aren't my forte. But if you can temporarily disable the volume
> switching
> > > and do a test roll out, see if you get the same problems or not, would
> > help
> > > eliminate it or confirm it.
> > >
> > > Sorry I can't help further on that.
> > >
> > > On Fri, May 1, 2020 at 5:34 AM JP MB 
> wrote:
> > >
> > > > I took a bit because I needed logs of the server shutting down when
> > this
> > > > occurs. Here they are, I can see some errors:
> > > >
> https://gist.github.com/josebrandao13/e8b82469d3e9ad91fbf38cf139b5a726
> > > >
> > > > Regarding systemd, the closest I could find to TimeoutStopSec was
> > > > DefaultTimeoutStopUSec=1min 30s that looks to be 90seconds. I could
> not
> > > > find any KillSignal or RestartKillSignal. You can see the output of
> > > > systemctl show --all here:
> > > >
> https://gist.github.com/josebrandao13/f2dd646fab19b19f127981fce92d78c4
> > > >
> > > > Once again, thanks for the help.
> > > >
> > > > Em qui., 30 de abr. de 2020 às 15:04, Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> escreveu:
> > > >
> > > > > I'd also suggest eyeballing your systemd conf to verify that
> someone
> > > > hasn't
> > > > > set a very low TimeoutStopSec, or that KillSignal/RestartKillSignal
> > > > haven't
> > > > > been configured to SIGKILL (confusingly named, imo, as the default
> > for
> > > > > KillSignal is SIGTERM).
> > > > >
> > > > > Also, the Kafka broker logs at shutdown look very different if it
> > shut
> > > > down
> > > > > currently vs if it didn't. Could you perhaps put them in a Gist and
> > > email
> > > > > the link?
> > > > >
> > > > > Just trying to make sure basic assumptions are holding :)
> > > > >
> > > > > On Fri, 1 May 2020, 1:21 am JP MB, 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > > It's quite a complex script generated with ansible where we use
> a/b
> > > > > > deployment and honestly, I don't have full knowledge on it I can
> > > share
> > > > > the
> > > > > > general guidelines of what is done:
> > > > > >
> > > > > > > - Any old volumes (from previous releases are removed) (named
> > with
> > > > > suffix
> > > > > > > '-old')
> > > > > > > - Detach the volumes attached to the old host
> > > > > > > - Stop the service in the old host - uses systemctl stop kafka
> > > > > > > - Attempt to create a CNAME volume: this is a volume with the
> > same
> > > > name
> > > > > > > that will be attached to the new box. Except for very first
> run,
> > > this
> > > > > > task
> > > > > > > is used to get the information about the existing volume. (no
> > > sufix)
> > > > > > > - A new volume is created as copy of the CNAME volume (named
> with
> > > > > suffix
> > > > > > > '-new')
> > > > > > > - The new volume is attached to the host/vm (named with suffix
> > > > '-new')
> > > > > > > - The new volume is formated (except for very first run, its
> > > already
> > > > > > > formated)(named with suffi

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread Pushkar Deole
If i understand correctly, Kafka is not designed to provide replicated
caching mechanism wherein the updates to cache will be synchronous across
multiple cache instances.

On Sun, May 3, 2020 at 10:49 PM Pushkar Deole  wrote:

> Thanks John.
>
> Actually, this is a normal consumer-producer application wherein there are
> 2 consumers (admin consumer and main consumer) consuming messages from 2
> different topics.
> One of the consumers consumes messages from a admin topic and populates
> data in a cache e.g. lets say agent with agent id 10 for which the first
> name and last name is received is populated in cache. When the other
> consumer consumes message and it has agent id 10 then it reads the cache,
> appends the first name and last name and then sends enriched event to
> producer.
> In this case, each application instance consumes all the events from admin
> topic (unique consumer id) and keeps them in the cache in memory.
> Now the requirement is to persist the cache and make is shared between the
> application instances, so each instance would consume partitions of admin
> topic and write to admin cache.
>
> If we want to use kafka streams, the application is so much evolved that
> it is difficult to migrate to streams at this stage. Secondly, from past
> mail chains, streams also won't serve the requirement since local state
> stores would just hold the local state of admin data and the cache written
> by each instance won't be shared with other instances.
>
> Global state stores may help but again it requires writing to the topic
> which is then synced with the state stores in the instances and the
> instances may not be in sync with each.
> I am not sure if this would cause any inconsistencies since i don't know
> how the events would flow from source e.g. if admin data is consumed by one
> instance which then modified the topic but it is not yet synced to all the
> global state stores and the next event arrived on the main consumer on a
> different instance and it tried to read from store cache then it doesn't
> get the data, so the event passed on without enriched data.
> That's pretty much about the use case.
>
>
> On Sun, May 3, 2020 at 9:42 PM John Roesler  wrote:
>
>> Hi Pushkar,
>>
>> I’ve been wondering if we should add writable tables to the Streams api.
>> Can you explain more about your use case and how it would integrate with
>> your application?
>>
>> Incidentally, this would also help us provide more concrete advice.
>>
>> Thanks!
>> John
>>
>> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
>> > Both stores sever a different purpose.
>> >
>> > Regular stores allow you to store state the application computes.
>> > Writing into the changelog is a fault-tolerance mechanism.
>> >
>> > Global store hold "axially" data that is provided from "outside" of the
>> > app. There is no changelog topic, but only the input topic (that is used
>> > to re-create the global state).
>> >
>> > Local stores are sharded and updates are "sync" as they don't need to be
>> > shared with anybody else.
>> >
>> > For global stores, as all instances need to be updated, updates are
>> > async (we don't know when which instance will update it's own global
>> > store replica).
>> >
>> > >> Say one stream thread updates the topic for global store and starts
>> > >> processing next event wherein the processor tries to read the global
>> store
>> > >> which may not have been synced with the topic?
>> >
>> > Correct. There is no guarantee when the update to the global store will
>> > be applied. As said, global stores are not designed to hold data the
>> > application computes.
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> > > thanks... will try with GlobalKTable.
>> > > As a side question, I didn't really understand the significance of
>> global
>> > > state store which kind of works in a reverse way to local state store
>> i.e.
>> > > local state store is updated and then saved to changelog topic
>> whereas in
>> > > case of global state store the topic is updated first and then synced
>> to
>> > > global state store. Do these two work in sync i.e. the update to
>> topic and
>> > > global state store ?
>> > >
>> > > Say one stream thread updates the topic for global store and starts
>> > > processing next event wherein the processor tries to read the global
>> store
>> > > which may not have been synced with the topic?
>> > >
>> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax 
>> wrote:
>> > >
>> > >> Yes.
>> > >>
>> > >> A `GlobalKTable` uses a global store internally.
>> > >>
>> > >> You can also use `StreamsBuilder.addGlobalStore()` or
>> > >> `Topology.addGlobalStore()` to add a global store "manually".
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >>
>> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> > >>> Thanks Matthias.
>> > >>> Can you elaborate on the replicated caching layer part?
>> > >>> When you say global stores, do you mean GlobalKTable created from a

Kafka consumer

2020-05-04 Thread vishnu murali
Hey Guys,

I am having a topic and in that topic I am having 3000 messages

In my springboot application I want to consume the data using
@KafkaListener()  and also one by one because  I need to do some tedious
process on that Data it may take some time

So within this time  I don't need to consume another data.

After the process is finished only I need to consume the data from the
topic .?

How can I do this?

Any ideas?