Error while creating topic using kafka-node client?

2019-01-24 Thread Rahul Singh
Hi All,

I am facing error while creating topic manually using kafka-node client.
The code is mentioned below.

Can anyone help please?

let topicsToCreate = [{ topic: topicName, partitions: 1, replicationFactor:
2 }];
admin.createTopics(topicsToCreate, (err, data) => {
if (err) {
console.log(err)
} else {
console.log(data)
}
});

The error it is throwing is given below ::

Error: No broker with id undefined
at KafkaClient.sendRequestToBroker
(/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:1089:21)
at getController
(/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:1190:10)
at loadMetadata
(/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:287:12)
at async.series
(/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:892:7)
at /Kafka/KafkaDemo/node_modules/async/dist/async.js:3888:9
at /Kafka/KafkaDemo/node_modules/async/dist/async.js:473:16
at replenish (/Kafka/KafkaDemo/node_modules/async/dist/async.js:1006:25)
at iterateeCallback
(/Kafka/KafkaDemo/node_modules/async/dist/async.js:995:17)
at /Kafka/KafkaDemo/node_modules/async/dist/async.js:969:16
at /Kafka/KafkaDemo/node_modules/async/dist/async.js:3885:13


Kafka Broker with -XX:+PerfDisableSharedMem

2019-01-24 Thread Mark Anderson
Hi,

After reading http://www.evanjones.ca/jvm-mmap-pause.html and
https://bugs.openjdk.java.net/browse/JDK-8076103 (alongside the linked
e-mail trail) I'm considering adding this flag when running Kafka.

I'm assuming this is safe to use and there are no unintended side effects?

Does anyone have any experience of running Kafka with this flag and seeing
a reduction in pauses?

Thanks,
Mark


Re: Error while creating topic using kafka-node client?

2019-01-24 Thread Christopher Bogan
I'm in the same boat

On Thu, Jan 24, 2019, 4:36 AM Rahul Singh <
rahul.si...@smartsensesolutions.com wrote:

> Hi All,
>
> I am facing error while creating topic manually using kafka-node client.
> The code is mentioned below.
>
> Can anyone help please?
>
> let topicsToCreate = [{ topic: topicName, partitions: 1, replicationFactor:
> 2 }];
> admin.createTopics(topicsToCreate, (err, data) => {
> if (err) {
> console.log(err)
> } else {
> console.log(data)
> }
> });
>
> The error it is throwing is given below ::
>
> Error: No broker with id undefined
> at KafkaClient.sendRequestToBroker
> (/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:1089:21)
> at getController
> (/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:1190:10)
> at loadMetadata
> (/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:287:12)
> at async.series
> (/Kafka/KafkaDemo/node_modules/kafka-node/lib/kafkaClient.js:892:7)
> at /Kafka/KafkaDemo/node_modules/async/dist/async.js:3888:9
> at /Kafka/KafkaDemo/node_modules/async/dist/async.js:473:16
> at replenish
> (/Kafka/KafkaDemo/node_modules/async/dist/async.js:1006:25)
> at iterateeCallback
> (/Kafka/KafkaDemo/node_modules/async/dist/async.js:995:17)
> at /Kafka/KafkaDemo/node_modules/async/dist/async.js:969:16
> at /Kafka/KafkaDemo/node_modules/async/dist/async.js:3885:13
>


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread John Roesler
Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.

The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,
-John

Thanks,
-John

On Wed, Jan 23, 2019 at 5:11 AM Peter Levart  wrote:

> Hi John,
>
> Sorry I haven't had time to prepare the minimal reproducer yet. I still
> have plans to do it though...
>
> On 1/22/19 8:02 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Just to follow up on the actual bug, can you confirm whether:
> > * when you say "restart", do you mean orderly shutdown and restart, or
> > crash and restart?
>
> I start it as SpringBoot application from IDEA and then stop it with the
> red square button. It does initiate the shutdown sequence before
> exiting... So I think it is by SIGTERM which initiates JVM shutdown
> hook(s).
>
> > * have you tried this with EOS enabled? I can imagine some ways that
> there
> > could be duplicates, but they should be impossible with EOS enabled.
>
> Yes, I have EOS enabled.
>
> >
> > Thanks for your help,
> > -John
>
> Regards, Peter
>
> >
> > On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:
> >
> >> Hi Peter,
> >>
> >> I see your train of thought, but the actual implementation of the
> >> window store is structured differently from your mental model.
> >> Unlike Key/Value stores, we know that the records in a window
> >> store will "expire" on a regular schedule, and also that every single
> >> record will eventually expire. With this in mind, we have implemented
> >> an optimization to avoid a lot of compaction overhead in RocksDB, as
> >> well as saving on range scans.
> >>
> >> Instead of storing everything in one database, we open several
> >> databases and bucket windows into them. Then, when windows
> >> expire, we just ignore the records (i.e., the API makes them
> unreachable,
> >> but we don't actually delete them). Once all the windows in a database
> >> are expired, we just close and delete the whole database. Then, we open
> >> a new one for new windows. If you look in the code, these databases are
> >> called "segments".
> >>
> >> Thus, I don't think that you should attempt to use the built-in window
> >> stores
> >> as you described. Instead, it should be straightforward to implement
> your
> >> own StateStore with a layout that's more favorable to your desired
> >> behavior.
> >>
> >> You should also be able to set up the change log the way you need as
> well.
> >> Explicitly removed entities also would get removed from the log as
> well, if
> >> it's a compacted log.
> >>
> >> Actually, what you're describing is *very* similar to the implementation
> >> for suppress. I might actually suggest that you just copy the
> suppression
> >> implementation and adapt it to your needs, or at the very least, study
> >> how it works. In doing so, you might actually discover the cause of the
> >> bug yourself!
> >>
> >> I hope this helps, and thanks for your help,
> >> -John
> >>
> >>
> >> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 
> >> wrote:
> >>
> >>> Hi Jonh,
> >>>
> >>> Thank you very much for explaining how WindowStore works. I have some
> >>> more questions...
> >>>
> >>> On 1/10/19 5:33 PM, John Roes

Producer RecordMetadata offset

2019-01-24 Thread Eymeric Guyot
Hello,

In my application when I send hundreds of thousands of messages I use the
Metadata in the callback to save the offset of the record for future usage.
However sometimes in something like 1% of the cases the metadata.offset()
returns -1 which makes things hard for me later as I can't find them. Why
does this happen and is there a way to get the offset without consuming the
topic to find it.

Thank you.

-- 



Visit: itiviti.com 
Read the latest news from 
Itiviti » 




*The information contained in 
or attached to this email is strictly confidential. If you are not the 
intended recipient, please notify us immediately by telephone and return 
the message to us.*

*
*

*Email communications by definition contain 
personal information. The Itiviti group of companies is subject to European 
data protection regulations. Itiviti’s Privacy Notice is available at 
www.itiviti.com . Itiviti expects the recipient of 
this email to be compliant with Itiviti’s Privacy Notice and applicable 
regulations. Please advise us immediately at dataprotectiont...@itiviti.com 
if you are not compliant with these.*


Re: Broker continuously expand and shrinks to itself

2019-01-24 Thread Harsha Chintalapani
Hi Ashish,
           Whats your replica.lag.time.max.ms set to and do you see any network 
issues between brokers.
-Harsha



On Jan 22, 2019, 10:09 PM -0800, Ashish Karalkar 
, wrote:
> Hi All,
> We just upgraded from 0.10.x to 1.1 and enabled rack awareness on an existing 
> clusters which has about 20 nodes in 4 rack . After this we see that few 
> brokers goes on continuous expand and shrink ISR to itself  cycle , it is 
> also causing high time for serving meta data requests.
> What is the impact of enabling rack awareness on existing cluster assuming 
> replication factor is 3 and all existing replica may or may not be in 
> different rack when rack awareness was enabled after which a rolling bounce 
> was done.
> Symptoms we are having are replica lag and slow metadata requests. Also in 
> brokers log we continuously see disconnection from the broker where it is 
> trying to expand.
> Thanks for helping
> --A


Re: Deploying Kafka topics in a kerberized Zookeeper without superuser (in a CI flow)

2019-01-24 Thread Harsha Chintalapani
Hi,
      When you kerberoize Kafka and enable zookeeper.set.acl to true, all the 
zookeeper nodes created under zookeeper root will have ACLs to allow only Kafka 
Broker’s principal. Since all topic creation will go directly to zookeeper, i.e 
Kafka-topic.sh script creates a zookeeper node under /broker/topics and it 
needs to have Kafka Broker’s principal set as ACL.  If you use any other 
principal it will create issues like you are seeing.
      One option is to disable zookeeper.set.acl. This means anyone who has 
access to zookeeper can create a topic. Better option would be to use 
KafkaAdminClient to createTopics which will send a createTopicRequest through 
brokers which can be authorized. Your CI can have its own principal and you can 
create authorization policy which will allow this principal to create topics.

-Harsha

On Jan 21, 2019, 3:00 AM -0800, Kristjan Peil , wrote:
> I'm running Kafka 1.1.1 and Zookeeper 3.4.6 in a cluster, both guarded by
> Kerberos. My app stack includes a module containing topic configurations,
> and my continuous integration build autodeploys changes to topics with
> kafka-topics.sh and kafka-configs.sh.
>
> When I try to use a non-superuser principal to authenticate in the scripts,
> the topic metadata is created by kafka-topics.sh in Zookeeper in such a way
> that Kafka cannot process it to create the actual topics in Kafka brokers -
> partitions are not created in the broker. Also, running kafka-configs.sh to
> alter configs of existing topics gets "NoAuth for /configs/".
>
> When I authenticate with the superuser principal "kafka" then everything
> works fine. But making the "kafka" superuser credentials available in CI
> context seems unsecure.
>
> Is it possible to use kafka-topics.sh and kafka-configs.sh in a kerberized
> environment with a non-superuser Kerberos principal and how can this be
> made to happen?
> Can you suggest an alternate solution to achieve CI for Kafka topics?
>
> Best regards,
> Kristjan Peil


Kafka Broker OOM

2019-01-24 Thread Ming Zhang
Hi All,

I encountered once kafka OOM, then I add a monitor to monitor Heap Memory
of Kafka.

here is a current status, I set kafka heap max is 96G. you can see it
changes significant. so anyone can help to point out where the problem is?
thanks in advance.

kafka version: 1.0.0

VM parameters:
-Xmx96G -Xms96G -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent
-Djava.awt.headless=true
-Xloggc:/home/ec2-user/kafka_2.11-1.0.0/bin/../logs/kafkaServer-gc.log
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=100M
some configuration:
max.message.bytes=104857600
replica.fetch.max.bytes=104857600
max.request.size=104857600
fetch.message.max.bytes=104857600

[image: image.png]

[image: image.png]


Kafka consumer configuration to minimize rebalance time

2019-01-24 Thread Marcos Juarez
One of our internal customers is working on a service that spans around 120
kubernetes pods.  Due to design constraints, every one of these pods has a
single kafka consumer, and they're all using the same consumer group id.
Since it's kubernetes, and the service is sized according to volume
throughout the day, pods are added/removed constantly, at least a few times
per hour.

What we are seeing with initial testing is that, whenever a single pod
joins or leaves the consumer group, it triggers a rebalance that sometimes
takes up to 60+ seconds to resolve.  Consumption resumes after the
rebalance event, but of course now there's 60+ second lag in consumption
for that topic.  Whenever there's a code deploy to these pods, and we need
to re-create all 120 pods, the problem seems to be exacerbated, and we run
into rebalances taking 200+ seconds.  This particular service is somewhat
sensitive to lag, so we'd like to keep the rebalance time to a minimum.

With that context, what kafka configs should we focus on on the consumer
side (and maybe the broker side?) that would enable us to minimize the time
spent on the rebalance?

Thanks,

Marcos Juarez


Re: Kafka consumer configuration to minimize rebalance time

2019-01-24 Thread Harsha Chintalapani
Hi Marcos,
           I think what you need is static membership which reduces the no.of 
rebalances required. There is active discussion and work going for this KIP 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

-Harsha

On Jan 24, 2019, 9:51 AM -0800, Marcos Juarez , wrote:
> One of our internal customers is working on a service that spans around 120
> kubernetes pods. Due to design constraints, every one of these pods has a
> single kafka consumer, and they're all using the same consumer group id.
> Since it's kubernetes, and the service is sized according to volume
> throughout the day, pods are added/removed constantly, at least a few times
> per hour.
>
> What we are seeing with initial testing is that, whenever a single pod
> joins or leaves the consumer group, it triggers a rebalance that sometimes
> takes up to 60+ seconds to resolve. Consumption resumes after the
> rebalance event, but of course now there's 60+ second lag in consumption
> for that topic. Whenever there's a code deploy to these pods, and we need
> to re-create all 120 pods, the problem seems to be exacerbated, and we run
> into rebalances taking 200+ seconds. This particular service is somewhat
> sensitive to lag, so we'd like to keep the rebalance time to a minimum.
>
> With that context, what kafka configs should we focus on on the consumer
> side (and maybe the broker side?) that would enable us to minimize the time
> spent on the rebalance?
>
> Thanks,
>
> Marcos Juarez


Producer RecordMetadata offset

2019-01-24 Thread Eymeric Guyot
Hello,

In my application when I send hundreds of thousands of messages I use the
Metadata in the callback to save the offset of the record for future usage.
However sometimes in something like 1% of the cases the metadata.offset()
returns -1 which makes things hard for me later as I can't find them. Why
does this happen and is there a way to get the offset without consuming the
topic to find it.

Thank you.

-- 



Visit: itiviti.com 
Read the latest news from 
Itiviti » 




*The information contained in 
or attached to this email is strictly confidential. If you are not the 
intended recipient, please notify us immediately by telephone and return 
the message to us.*

*
*

*Email communications by definition contain 
personal information. The Itiviti group of companies is subject to European 
data protection regulations. Itiviti’s Privacy Notice is available at 
www.itiviti.com . Itiviti expects the recipient of 
this email to be compliant with Itiviti’s Privacy Notice and applicable 
regulations. Please advise us immediately at dataprotectiont...@itiviti.com 
if you are not compliant with these.*


Re: Open files clogging and KafkaStreams

2019-01-24 Thread Niklas Lönn
Hi.

I have something good (and personally mysterious) to report.

We do indeed run 1.1.x in production.

 And today when I was almost finished cleaning up my test case for public
display, I had been forced by corp policies to update osx, and suddenly
when I had my test in a "non hacky improvised piece of iteration test code
not asserting stuff" mode, I couldn't recreate the issue any more, not with
the new or the old code.

I suspect I was unlucky to hit some other issue in my os/firmware having
very similar symptoms as we had in production, ran my test on another
computer without this update and it was fine there as well.

I guess that concludes that you are most likely very right with this 1.1
bug and I was super unlucky to be able to recreate it locally due to other
issues.

Thanks for the support and rubber ducking :)

Kind regards
Niklas

On Thu 24. Jan 2019 at 02:08, Guozhang Wang  wrote:

> I see (btw attachments are usually not allowed in AK mailing list, but if
> you have it somewhere like gitcode and can share the url that works).
>
> Could you let me know how many physical cores do you have in total hosting
> your app and how many threads did you configure? From your current
> description there should have at least 40 tasks (20 reading from source
> topics and writing to repartition topics, and 20 reading from repartition
> topics), and I'd like to know how are these tasks be assigned to threads,
> and how many threads may be executed in parallel from the hardware.
>
>
> Guozhang
>
>
> On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn  wrote:
>
> > I have to double check what version of broker we run in production but
> when
> > testing and verifying the issue locally I did reproduce it with both
> broker
> > and client version 2.1.0
> >
> > Kind regards
> > Niklas
> >
> > On Wed 23. Jan 2019 at 18:24, Guozhang Wang  wrote:
> >
> > > I see.
> > >
> > > What you described is a known issue in the older version of Kafka, that
> > > some high traffic topics in the bootstrap mode may effectively "starve"
> > > other topics in the fetch response, since brokers used to naively fill
> in
> > > the bytes that meets the max.bytes configuration and returns. This is
> > fixed
> > > in 1.1 version via incremental fetch request:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > >
> > > The basic idea is to not always request topics like A,B,C; instead if
> the
> > > previous request asks for topics A,B,C and got all data from A, then
> next
> > > request would be B,C,A, etc. So if you are on older versions of Kafka
> I'd
> > > suggest you upgrade to newer version.
> > >
> > > If you cannot upgrade atm, another suggest as I mentioned above is to
> > > change the segment sizes so you can have much larger, and hence fewer
> > > segment files.
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn 
> > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > I think I went a bit ahead of myself in describing the situation, I
> had
> > > an
> > > > attachment with the context in detail, maybe it was filtered out.
> Lets
> > > try
> > > > again =)
> > > >
> > > > We have a topology looking something like this:
> > > >
> > > > input-topic[20 partitions, compacted]
> > > > |
> > > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > > =10min]
> > > > |
> > > > use-case-changelog
> > > >
> > > > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > > > raising the bar to something extreme.
> > > > Later we found out that we wanted rep factor 3 on all internal
> topics,
> > so
> > > > we reset the app and BOOM, now we hit a too many memory mapped files
> > > limit
> > > > instead
> > > >
> > > > the input topic contains 30 days of data, where we pretty much have
> > > records
> > > > in every 10minute window for every partition.
> > > > This means if nothing consumes the repartition topic we will have 6
> (10
> > > min
> > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log
> .timeindex
> > > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> > > files
> > > > just to have this repartition topic in place.
> > > >
> > > > You would say, yeah but no problem as it would be deleted and you
> would
> > > not
> > > > reach such high numbers? But doesn't seem to be the case.
> > > > What happened in our case is that, due to how the broker multiplexes
> > the
> > > > topic partitions for the subscribers, the streams application piled
> up
> > > all
> > > > the repartition records, and only when caught up, all the downstream
> > > > processes started taking place. I do see this as a design flaw in
> some
> > > > component, probably the broker. It cant be the desired behaviour. How
> > > many
> > > > open files do I need to be able to have open in a year of data when
> > > > resetting/reprocessing an application?
> > > >
> > > > B

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart

Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:

Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.


The fact is that Spring which I use to instantiate the KafkaStreams 
object does that:


    @Bean(initMethod = "start", destroyMethod = "close")
    public KafkaStreams processorStreams(...

..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts 
down the ApplicationContext which calls all "destroyMethod"(s) on 
registered Bean(s)...


And the duplicates are less apparent but still occur even in EOS mode... 
But they are not actual duplicates. They are duplicate(s) only by 
windowed keys, the values are different...




The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.


So when EOS is enabled, the output topics are used in transactional 
manner. The consumer of such topic should enable read_commited semantics 
then...


That would do if my problem was about seeing duplicates of final 
windowing results. That is not my problem. My problem is that upon 
restart of processor, I see some non-final window aggregations, followed 
by final aggregations for the same windowed key. That's harder to 
tolerate in an application. If it was just duplicates of the "correct" 
aggregation I could ignore the 2nd and subsequent message for the same 
windowed key, but if I 1st get a non-final aggregation, I can not simply 
ignore the 2nd occurence of the same windowed key. I must cope with 
"replacing the previous aggregation with new version of it" in the app. 
Meaning, that suppression of non-final results does not buy me anything 
as it is not guaranteeing that.


Is it possible that non-final windowed aggregations are emitted in some 
scenario, but then such transaction is rolled-back and I would not see 
the non-fnal aggregations if I enabled read commited isolation on consumer?


I think I'll have to reinstate the demo and try that...

Stay tuned.

Regards, Peter



Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,
-John

Thanks,
-John

On Wed, Jan 23, 2019 at 5:11 AM Peter Levart  wrote:


Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still
have plans to do it though...

On 1/22/19 8:02 PM, John Roesler wrote:

Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?

I start it as SpringBoot application from IDEA and then stop it with the
red square button. It does initiate the shutdown sequence before
exiting... So I think it is by SIGTERM which initiates JVM shutdown
hook(s).


* have you tried this with EOS enabled? I can imagine some ways that

there

could be duplicates, but they should be impossible with EOS enabled.

Yes, I have EOS enabled.


Thanks for your help,
-John

Regards, Peter


On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:


Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we 

Re: Open files clogging and KafkaStreams

2019-01-24 Thread Guozhang Wang
Not a problem. Glad that you've not seen it anymore now.

If it occurs again please feel free to reach out to the community again.


Guozhang

On Thu, Jan 24, 2019 at 2:32 PM Niklas Lönn  wrote:

> Hi.
>
> I have something good (and personally mysterious) to report.
>
> We do indeed run 1.1.x in production.
>
>  And today when I was almost finished cleaning up my test case for public
> display, I had been forced by corp policies to update osx, and suddenly
> when I had my test in a "non hacky improvised piece of iteration test code
> not asserting stuff" mode, I couldn't recreate the issue any more, not with
> the new or the old code.
>
> I suspect I was unlucky to hit some other issue in my os/firmware having
> very similar symptoms as we had in production, ran my test on another
> computer without this update and it was fine there as well.
>
> I guess that concludes that you are most likely very right with this 1.1
> bug and I was super unlucky to be able to recreate it locally due to other
> issues.
>
> Thanks for the support and rubber ducking :)
>
> Kind regards
> Niklas
>
> On Thu 24. Jan 2019 at 02:08, Guozhang Wang  wrote:
>
> > I see (btw attachments are usually not allowed in AK mailing list, but if
> > you have it somewhere like gitcode and can share the url that works).
> >
> > Could you let me know how many physical cores do you have in total
> hosting
> > your app and how many threads did you configure? From your current
> > description there should have at least 40 tasks (20 reading from source
> > topics and writing to repartition topics, and 20 reading from repartition
> > topics), and I'd like to know how are these tasks be assigned to threads,
> > and how many threads may be executed in parallel from the hardware.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn 
> wrote:
> >
> > > I have to double check what version of broker we run in production but
> > when
> > > testing and verifying the issue locally I did reproduce it with both
> > broker
> > > and client version 2.1.0
> > >
> > > Kind regards
> > > Niklas
> > >
> > > On Wed 23. Jan 2019 at 18:24, Guozhang Wang 
> wrote:
> > >
> > > > I see.
> > > >
> > > > What you described is a known issue in the older version of Kafka,
> that
> > > > some high traffic topics in the bootstrap mode may effectively
> "starve"
> > > > other topics in the fetch response, since brokers used to naively
> fill
> > in
> > > > the bytes that meets the max.bytes configuration and returns. This is
> > > fixed
> > > > in 1.1 version via incremental fetch request:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > > >
> > > > The basic idea is to not always request topics like A,B,C; instead if
> > the
> > > > previous request asks for topics A,B,C and got all data from A, then
> > next
> > > > request would be B,C,A, etc. So if you are on older versions of Kafka
> > I'd
> > > > suggest you upgrade to newer version.
> > > >
> > > > If you cannot upgrade atm, another suggest as I mentioned above is to
> > > > change the segment sizes so you can have much larger, and hence fewer
> > > > segment files.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn 
> > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > I think I went a bit ahead of myself in describing the situation, I
> > had
> > > > an
> > > > > attachment with the context in detail, maybe it was filtered out.
> > Lets
> > > > try
> > > > > again =)
> > > > >
> > > > > We have a topology looking something like this:
> > > > >
> > > > > input-topic[20 partitions, compacted]
> > > > > |
> > > > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > > > =10min]
> > > > > |
> > > > > use-case-changelog
> > > > >
> > > > > We have previously hit the TooManyOpenFiles issue and "solved" it
> by
> > > > > raising the bar to something extreme.
> > > > > Later we found out that we wanted rep factor 3 on all internal
> > topics,
> > > so
> > > > > we reset the app and BOOM, now we hit a too many memory mapped
> files
> > > > limit
> > > > > instead
> > > > >
> > > > > the input topic contains 30 days of data, where we pretty much have
> > > > records
> > > > > in every 10minute window for every partition.
> > > > > This means if nothing consumes the repartition topic we will have 6
> > (10
> > > > min
> > > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log
> > .timeindex
> > > > > files) * 3 replication factor / 5 brokers in cluster = *155.520
> *open
> > > > files
> > > > > just to have this repartition topic in place.
> > > > >
> > > > > You would say, yeah but no problem as it would be deleted and you
> > would
> > > > not
> > > > > reach such high numbers? But doesn't seem to be the case.
> > > > > What happened in our case is that, due to how the broker
> multiplexes
> > > the
> > >

Re: High system.io.await on Kafka brokers?

2019-01-24 Thread wenxing zheng
Thanks to Sam. We are ongoing the evaluation.

On 2019/01/22 09:50:53, Sam Pegler  wrote: 
> Sounds like you're reaching the limits of what your disks will do either on
> reads or writes.  Debug it as you would any other disk based app,
> https://haydenjames.io/linux-server-performance-disk-io-slowing-application/
> might help.
> 
> 
> 
> On Tue, 22 Jan 2019 at 09:19, wenxing zheng  wrote:
> 
> > Dear all,
> >
> > We got a kafka cluster with 5 nodes, and from the metrics of datadog, we
> > found that regularly the elapse for sending to kafka was more than 200ms,
> > and there was a peek on the system.io.await.
> >
> > Please help to advice what would be the problem and any hints?
> > [image: image.png]
> >
> > Kind regards, Wenxing
> >
> 


Newbie question using Kafka Producers in Web apps

2019-01-24 Thread Raghavendran Chellappa
Hi All,
We have a Spring based web app.

We are planning to build an 'Audit Tracking' feature and plan to use Kafka
- as a sink for storing Audit messages (which will then be consumed and
persisted to a common DB).



We are planning to build a simple, ‘pass-through’ REST service which will
take a JSON and push it into the appropriate Kafka topic.

This REST service will be called from various pages (from server side) in
the web app (during Create, View, Edit actions) to store the Audit entries.



My question is can we directly have Kafka Producers in the web app so that
we post messages to Kafka Topic directly (instead of going through a
Webservice)?

Will adding a Kafka Producer in web app will make the app less stable (make
pages less performant)? This is one of the reasons why we want to hide the
Kafka producer complexity behind the webservice. Also we feel that this
webservice can be a starting point for a generic “Auditing service” that
can be used by other applications, in the enterprise, in the future.



I think the ‘pass-through’ webservice is not required and it is OK to push
messages directly from web app to Kafka (but unable to point to any
examples of this being done or any benefits of doing so).



What do you think?



Thanks,

Ragha


Re: Newbie question using Kafka Producers in Web apps

2019-01-24 Thread Michael Eugene
I don’t feel it would be a big hit in performance because Kafka works very 
fast. I think the speed difference would be negligible. Why are you worried 
about stability? I’m just curious because it doesn’t seem like it would be 
unstable, but maybe it would be a bit overkill for one app and some decoupling 
might make sense. 

Sent from my iPhone

> On Jan 24, 2019, at 9:59 PM, Raghavendran Chellappa 
>  wrote:
> 
> Hi All,
> We have a Spring based web app.
> 
> We are planning to build an 'Audit Tracking' feature and plan to use Kafka
> - as a sink for storing Audit messages (which will then be consumed and
> persisted to a common DB).
> 
> 
> 
> We are planning to build a simple, ‘pass-through’ REST service which will
> take a JSON and push it into the appropriate Kafka topic.
> 
> This REST service will be called from various pages (from server side) in
> the web app (during Create, View, Edit actions) to store the Audit entries.
> 
> 
> 
> My question is can we directly have Kafka Producers in the web app so that
> we post messages to Kafka Topic directly (instead of going through a
> Webservice)?
> 
> Will adding a Kafka Producer in web app will make the app less stable (make
> pages less performant)? This is one of the reasons why we want to hide the
> Kafka producer complexity behind the webservice. Also we feel that this
> webservice can be a starting point for a generic “Auditing service” that
> can be used by other applications, in the enterprise, in the future.
> 
> 
> 
> I think the ‘pass-through’ webservice is not required and it is OK to push
> messages directly from web app to Kafka (but unable to point to any
> examples of this being done or any benefits of doing so).
> 
> 
> 
> What do you think?
> 
> 
> 
> Thanks,
> 
> Ragha


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart

Hi John,

Haven't been able to reinstate the demo yet, but I have been re-reading 
the following scenario of yours


On 1/24/19 11:48 PM, Peter Levart wrote:

Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:



The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was 
emitted.

Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe 
are

in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.


...and I was thinking that perhaps the right solution to the suppression 
problem would be to use transactional producers for the resulting output 
topic AND the store change-log. Is this possible? Does the compaction of 
the log on the brokers work for transactional producers as expected? In 
that case, the sending of final result and the marking of that fact in 
the store change log would together be an atomic operation.
That said, I think there's another problem with suppression which looks 
like the supression processor is already processing the input while the 
state store has not been fully restored yet or something related... Is 
this guaranteed not to happen?


And now something unrelated I wanted to ask...

I'm trying to create my own custom state store. From the API I can see 
it is pretty straightforward. One thing that I don't quite understand is 
how Kafka Streams know whether to replay the whole change log after the 
store registers itself or just a part of it and which part (from which 
offset per partition). There doesn't seem to be any API point through 
which the store could communicate this information back to Kafka 
Streams. Is such bookkeeping performed outside the store? Does Kafka 
Streams first invoke flush() on the store and then notes down the 
offsets from the change log producer somewhere? So next time the store 
is brought up, the log is only replayed from last noted down offset? So 
it can happen that the store gets some log entries that have already 
been incorporated in it (from the point of one flush before) but never 
misses any... In any case there has to be an indication somewhere that 
the store didn't survive and has to be rebuilt from scratch. How do 
Kafka Streams detect that situation? By placing some marker file into 
the directory reserved for store's local storage?


Regards, Peter