Re: Can kafka internal state be purged ?

2019-06-21 Thread John Roesler
No problem. It's definitely a subtlety. It occurs because each
partition is processed completely independently of the others, so
"stream time" is tracked per partition, and there's no way to look
across at the other partitions to find out what stream time they have.

In general, it's not a problem because you'd expect all partitions to
receive updates over time, but if you're specifically trying to send
events that cause stuff to get flushed from the buffers, it can mess
with you. It's especially notable in tests. So, for most tests, I just
configure the topics to have one partition.

-John

On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan  wrote:
>
> That change "In the same partition" must explain what we are seeing. Unless 
> you see one message per partition, all windows will not expire. That is an 
> interesting twist. Thanks for the correction ( I will go back and confirm 
> this.
>
> -mohan
>
>
> On 6/21/19, 12:40 PM, "John Roesler"  wrote:
>
> Sure, the record cache attempts to save downstream operators from
> unnecessary updates by also buffering for a short amount of time
> before forwarding. It forwards results whenever the cache fills up or
> whenever there is a commit. If you're happy to wait at least "commit
> interval" amount of time for updates, then you don't need to do
> anything, but if you're on the edge of your seat, waiting for these
> results, you can set cache.max.bytes.buffering to 0 to disable the
> record cache entirely. Note that this would hurt throughput in
> general, though.
>
> Just a slight modification:
> * a new record with new timestamp > (all the previous timestamps +
> grace period) will cause all the old windows *in the same partition*
> to close
> * yes, expiry of the window depends only on the event time
>
> Hope this helps!
> -John
>
> On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan  
> wrote:
> >
> > Could you tell me a little more about the delays about the record 
> caches and how I can disable it ?
> >
> >  If I could summarize my problem:
> >
> > -A new record with a new timestamp > all records sent before, I expect 
> *all* of the old windows to close
> > -Expiry of the windows depends only on the event time and not on the key
> >
> > Are these two statements correct ?
> >
> > Thanks
> > Mohan
> >
> > On 6/20/19, 9:17 AM, "John Roesler"  wrote:
> >
> > Hi!
> >
> > In addition to setting the grace period to zero (or some small
> > number), you should also consider the delays introduced by record
> > caches upstream of the suppression. If you're closely watching the
> > timing of records going into and coming out of the topology, this
> > might also spoil your expectations. You could always disable the
> > record cache to make the system more predictable (although this 
> would
> > hurt throughput in production).
> >
> > Thanks,
> > -John
> >
> > On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, Mohan 
>  wrote:
> > >
> > > We do explicitly set the grace period to zero. I am going to try 
> the new version
> > >
> > > -mohan
> > >
> > >
> > > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan"  
> wrote:
> > >
> > > Thanks. We will give it a shot.
> > >
> > > On 6/19/19, 12:42 PM, "Bruno Cadonna"  
> wrote:
> > >
> > > Hi Mohan,
> > >
> > > I realized that my previous statement was not clear. With 
> a grace
> > > period of 12 hour, suppress would wait for late events 
> until stream
> > > time has advanced 12 hours before a result would be 
> emitted.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna 
>  wrote:
> > > >
> > > > Hi Mohan,
> > > >
> > > > if you do not set a grace period, the grace period 
> defaults to 12
> > > > hours. Hence, suppress would wait for an event that 
> occurs 12 hour
> > > > later before it outputs a result. Try to explicitly set 
> the grace
> > > > period to 0 and let us know if it worked.
> > > >
> > > > If it still does not work, upgrade to version 2.2.1 if 
> it is possible
> > > > for you. We had a couple of bugs in suppress recently 
> that are fixed
> > > > in that version.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, Mohan 
>  wrote:
> > > > >
> > > > > No, I have not set any grace 

Re: Can kafka internal state be purged ?

2019-06-21 Thread Parthasarathy, Mohan
That change "In the same partition" must explain what we are seeing. Unless you 
see one message per partition, all windows will not expire. That is an 
interesting twist. Thanks for the correction ( I will go back and confirm this.

-mohan


On 6/21/19, 12:40 PM, "John Roesler"  wrote:

Sure, the record cache attempts to save downstream operators from
unnecessary updates by also buffering for a short amount of time
before forwarding. It forwards results whenever the cache fills up or
whenever there is a commit. If you're happy to wait at least "commit
interval" amount of time for updates, then you don't need to do
anything, but if you're on the edge of your seat, waiting for these
results, you can set cache.max.bytes.buffering to 0 to disable the
record cache entirely. Note that this would hurt throughput in
general, though.

Just a slight modification:
* a new record with new timestamp > (all the previous timestamps +
grace period) will cause all the old windows *in the same partition*
to close
* yes, expiry of the window depends only on the event time

Hope this helps!
-John

On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan  
wrote:
>
> Could you tell me a little more about the delays about the record caches 
and how I can disable it ?
>
>  If I could summarize my problem:
>
> -A new record with a new timestamp > all records sent before, I expect 
*all* of the old windows to close
> -Expiry of the windows depends only on the event time and not on the key
>
> Are these two statements correct ?
>
> Thanks
> Mohan
>
> On 6/20/19, 9:17 AM, "John Roesler"  wrote:
>
> Hi!
>
> In addition to setting the grace period to zero (or some small
> number), you should also consider the delays introduced by record
> caches upstream of the suppression. If you're closely watching the
> timing of records going into and coming out of the topology, this
> might also spoil your expectations. You could always disable the
> record cache to make the system more predictable (although this would
> hurt throughput in production).
>
> Thanks,
> -John
>
> On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, Mohan 
 wrote:
> >
> > We do explicitly set the grace period to zero. I am going to try 
the new version
> >
> > -mohan
> >
> >
> > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan"  
wrote:
> >
> > Thanks. We will give it a shot.
> >
> > On 6/19/19, 12:42 PM, "Bruno Cadonna"  
wrote:
> >
> > Hi Mohan,
> >
> > I realized that my previous statement was not clear. With a 
grace
> > period of 12 hour, suppress would wait for late events 
until stream
> > time has advanced 12 hours before a result would be emitted.
> >
> > Best,
> > Bruno
> >
> > On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna 
 wrote:
> > >
> > > Hi Mohan,
> > >
> > > if you do not set a grace period, the grace period 
defaults to 12
> > > hours. Hence, suppress would wait for an event that 
occurs 12 hour
> > > later before it outputs a result. Try to explicitly set 
the grace
> > > period to 0 and let us know if it worked.
> > >
> > > If it still does not work, upgrade to version 2.2.1 if it 
is possible
> > > for you. We had a couple of bugs in suppress recently 
that are fixed
> > > in that version.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, Mohan 
 wrote:
> > > >
> > > > No, I have not set any grace period. Is that mandatory 
? Have you seen problems with suppress and windows expiring ?
> > > >
> > > > Thanks
> > > > Mohan
> > > >
> > > > On 6/19/19, 12:41 AM, "Bruno Cadonna" 
 wrote:
> > > >
> > > > Hi Mohan,
> > > >
> > > > Did you set a grace period on the window?
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Jun 18, 2019 at 2:04 AM Parthasarathy, 
Mohan  wrote:
> > > > >
> > > > > On further debugging, what we are seeing is that 
windows are expiring rather randomly as new messages are being processed. . We 
tested with new key for every new message. We waited for the 

Re: Can kafka internal state be purged ?

2019-06-21 Thread John Roesler
Sure, the record cache attempts to save downstream operators from
unnecessary updates by also buffering for a short amount of time
before forwarding. It forwards results whenever the cache fills up or
whenever there is a commit. If you're happy to wait at least "commit
interval" amount of time for updates, then you don't need to do
anything, but if you're on the edge of your seat, waiting for these
results, you can set cache.max.bytes.buffering to 0 to disable the
record cache entirely. Note that this would hurt throughput in
general, though.

Just a slight modification:
* a new record with new timestamp > (all the previous timestamps +
grace period) will cause all the old windows *in the same partition*
to close
* yes, expiry of the window depends only on the event time

Hope this helps!
-John

On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan  wrote:
>
> Could you tell me a little more about the delays about the record caches and 
> how I can disable it ?
>
>  If I could summarize my problem:
>
> -A new record with a new timestamp > all records sent before, I expect *all* 
> of the old windows to close
> -Expiry of the windows depends only on the event time and not on the key
>
> Are these two statements correct ?
>
> Thanks
> Mohan
>
> On 6/20/19, 9:17 AM, "John Roesler"  wrote:
>
> Hi!
>
> In addition to setting the grace period to zero (or some small
> number), you should also consider the delays introduced by record
> caches upstream of the suppression. If you're closely watching the
> timing of records going into and coming out of the topology, this
> might also spoil your expectations. You could always disable the
> record cache to make the system more predictable (although this would
> hurt throughput in production).
>
> Thanks,
> -John
>
> On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, Mohan  
> wrote:
> >
> > We do explicitly set the grace period to zero. I am going to try the 
> new version
> >
> > -mohan
> >
> >
> > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan"  wrote:
> >
> > Thanks. We will give it a shot.
> >
> > On 6/19/19, 12:42 PM, "Bruno Cadonna"  wrote:
> >
> > Hi Mohan,
> >
> > I realized that my previous statement was not clear. With a 
> grace
> > period of 12 hour, suppress would wait for late events until 
> stream
> > time has advanced 12 hours before a result would be emitted.
> >
> > Best,
> > Bruno
> >
> > On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna 
>  wrote:
> > >
> > > Hi Mohan,
> > >
> > > if you do not set a grace period, the grace period defaults 
> to 12
> > > hours. Hence, suppress would wait for an event that occurs 12 
> hour
> > > later before it outputs a result. Try to explicitly set the 
> grace
> > > period to 0 and let us know if it worked.
> > >
> > > If it still does not work, upgrade to version 2.2.1 if it is 
> possible
> > > for you. We had a couple of bugs in suppress recently that 
> are fixed
> > > in that version.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, Mohan 
>  wrote:
> > > >
> > > > No, I have not set any grace period. Is that mandatory ? 
> Have you seen problems with suppress and windows expiring ?
> > > >
> > > > Thanks
> > > > Mohan
> > > >
> > > > On 6/19/19, 12:41 AM, "Bruno Cadonna"  
> wrote:
> > > >
> > > > Hi Mohan,
> > > >
> > > > Did you set a grace period on the window?
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Jun 18, 2019 at 2:04 AM Parthasarathy, Mohan 
>  wrote:
> > > > >
> > > > > On further debugging, what we are seeing is that 
> windows are expiring rather randomly as new messages are being processed. . 
> We tested with new key for every new message. We waited for the window time 
> before replaying new messages. Sometimes a new message would come in and 
> create state. It takes several messages to make some of the old windows to be 
> closed (go past suppress to the next stage). We have also seen where one of 
> them never closed even but several other older ones expired.  Then we 
> explicitly sent a message with the same old key and then it showed up. Also, 
> for every new message, only one of the previous window expires even though 
> there are several pending.
> > > > >
> > > > > If we don't use suppress, then there is never an 
> issue. With suppress, the behavior we are seeing is weird. We are 

Re: Kafka Streams - offset number affected by "exactly once processing guarantee"

2019-06-21 Thread Trinity Xia
Thank you so much for the detailed explanation, Matthias!

It's really helpful! Hope you have a good weekend :)

On Thu, Jun 20, 2019 at 5:30 PM Matthias J. Sax 
wrote:

> The observed behavior is expected.
>
> > For example, if we send 2615 events to an empty topic, we expect the end
> of
> >> the topic to be offset 2616.
>
> This is a wrong expectation. Even if Kafka behaves that way for non-EOS,
> there is no "contract" that guarantees that offsets are consecutive.
> Kafka only guarantees that offsets are ascending.
>
> In fact, if you enable log compaction, there are also "missing offsets"
> after a topic was compacted.
>
> For EOS, each time a transaction is committed, a commit marker is
> written into the topic, and the commit marked "eats" one offset.
> (Similarly, if a transaction is aborted writing an abort marker).
>
> >> In addition, there are "missing offsets" in the topic. There seems to be
> >> nothing inside message offset 5, but it still counts as a message...I
> used
> >> Kafkacat to print out that message, and I have specified  -Z (Print NULL
> >> messages and keys as "NULL"(instead of empty)), but it didn't print out
>
> Those markers are internal and related to your "missing offsets". You
> cannot read the markers as clients will skip over them on read.
>
> There is also a related issue with regard to transnational markers:
> https://issues.apache.org/jira/browse/KAFKA-6607 Lag will never be
> reported as zero, because of the tailing transactional markers.
>
>
>
> The number of stored messages in a partition are not exposed in Kafka
> and I am not aware of a way to get this information.
>
>
>
> -Matthias
>
>
> On 6/20/19 2:42 PM, Trinity Xia wrote:
> > Hi,
> >
> > We are using Kafka Streams 2.2.1 and Kafka 2.2.0, and we noticed that the
> > end offset number is larger than the numbers of events sent to a topic if
> > we set *processing guarantee* as *exactly once* in a Kafka Streams app.
> >
> > For example, if we send 2615 events to an empty topic, we expect the end
> of
> > the topic to be offset 2616. However,  if we set *processing
> > guarantee* as *exactly
> > once,* the end of the topic is offset 2619 instead.
> >
> > Key: 5-5 offset: 0
> > Key: 5--5 offset: 1
> > Key: 5--5 offset: 2
> > Key: 5--5 offset: 3
> > Key: 5--5 offset: 4
> > Key: 25--f offset: 6
> > Key: 25--f offset: 7
> > Key: 25--f offset: 8
> > .
> >
> > % Reached end of topic tracking_events[0] at offset 2619: exiting
> >
> > In addition, there are "missing offsets" in the topic. There seems to be
> > nothing inside message offset 5, but it still counts as a message...I
> used
> > Kafkacat to print out that message, and I have specified  -Z (Print NULL
> > messages and keys as "NULL"(instead of empty)), but it didn't print out
> > anything either...
> >
> > Has anyone encountered the same issue before?  Are there any other
> methods
> > to calculate the number of messages inside a topic efficiently?  Thanks
> for
> > the help :)
> >
> > Trinity
> >
>
>

-- 
[image: photo]
Trinity Xia
Software Engineer at Scribd
A  333 Bush Street | San Francisco, CA 94104
E  trini...@scribd.com
W  www.scribd.com  
  
  


Re: [VOTE] 2.3.0 RC3

2019-06-21 Thread Kamal Chandraprakash
+1 (non-binding)

* Ran unit and integration test on 2.11 and 2.12
* Verified quick start
* Ran internal apps on the 3 node cluster

On Thu, Jun 20, 2019 at 3:33 AM Colin McCabe  wrote:

> Hi all,
>
> We discovered some problems with the second release candidate (RC2) of
> 2.3.0.  Specifically, KAFKA-8564.  I've created a new RC which includes the
> fix for this issue.
>
> Check out the release notes for the 2.3.0 release here:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/RELEASE_NOTES.html
>
> The vote will go until Saturday, June 22nd, or until we create another RC.
>
> * Kafka's KEYS file containing PGP keys we use to sign the release can be
> found here:
> https://kafka.apache.org/KEYS
>
> * The release artifacts to be voted upon (source and binary) are here:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/javadoc/
>
> * The tag to be voted upon (off the 2.3 branch) is the 2.3.0 tag:
> https://github.com/apache/kafka/releases/tag/2.3.0-rc3
>
> best,
> Colin
>
> C.
>


Assunto: Kafka hangs on startup

2019-06-21 Thread Marcelo Barbosa
Hi Nik, 
It seams that the problem is not on the startup because the logs shows that it 
took about 2 secs to start. How exactly did you detect that it stuck? 
Cheers, 
Barbosa

Enviado do Yahoo Mail no Android 
 
  Em sex, 21 21e jun 21e 2019 às 5:52, Nik 
Hodgkinson<11x...@gmail.com> escreveu:   I'm experiencing an 
issue I'm not sure how to start to track down. My Kafka
brokers often startup and then hang part way through startup. The log has
nothing useful to point to a problem and I'm not sure where to go from
here. I've pasted a startup log below; please advise how to proceed.

Thanks,
-Nik
11x...@gmail.com
(913) 927-4891

--- LOG BEGINS ---
[2019-06-21 08:36:48,447] INFO Registered kafka:type=kafka.Log4jController
MBean (kafka.utils.Log4jControllerRegistration$)
[2019-06-21 08:36:48,997] INFO starting (kafka.server.KafkaServer)
[2019-06-21 08:36:49,002] INFO Connecting to zookeeper on
zookeeper-local-headless:2181/kafka-local (kafka.server.KafkaServer)
[2019-06-21 08:36:49,021] INFO [ZooKeeperClient] Initializing a new session
to zookeeper-local-headless:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-06-21 08:36:49,025] INFO Client
environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03,
built on 06/29/2018 00:39 GMT (org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client
environment:host.name=kafka-local-stateful-set-2.kafka-local-headless.default.svc.cluster.local
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client environment:java.version=1.8.0_212
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client environment:java.vendor=Oracle
Corporation (org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client
environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,026] INFO Client

Kafka hangs on startup

2019-06-21 Thread Nik Hodgkinson
I'm experiencing an issue I'm not sure how to start to track down. My Kafka
brokers often startup and then hang part way through startup. The log has
nothing useful to point to a problem and I'm not sure where to go from
here. I've pasted a startup log below; please advise how to proceed.

Thanks,
-Nik
11x...@gmail.com
(913) 927-4891

--- LOG BEGINS ---
[2019-06-21 08:36:48,447] INFO Registered kafka:type=kafka.Log4jController
MBean (kafka.utils.Log4jControllerRegistration$)
[2019-06-21 08:36:48,997] INFO starting (kafka.server.KafkaServer)
[2019-06-21 08:36:49,002] INFO Connecting to zookeeper on
zookeeper-local-headless:2181/kafka-local (kafka.server.KafkaServer)
[2019-06-21 08:36:49,021] INFO [ZooKeeperClient] Initializing a new session
to zookeeper-local-headless:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-06-21 08:36:49,025] INFO Client
environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03,
built on 06/29/2018 00:39 GMT (org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client
environment:host.name=kafka-local-stateful-set-2.kafka-local-headless.default.svc.cluster.local
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client environment:java.version=1.8.0_212
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client environment:java.vendor=Oracle
Corporation (org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,025] INFO Client
environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
(org.apache.zookeeper.ZooKeeper)
[2019-06-21 08:36:49,026] INFO Client