[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-04-24 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331171#comment-17331171
 ] 

Bruno Cadonna commented on KAFKA-10493:
---

Are you proposing to drop out-of-order records although log compaction with 
source topic optimization would remove the correct record? Wouldn't that mean 
we would replace a deterministic behavior with an non-deterministic behavior?

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-24 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-826067611


   @ijuma quick update: I did made some progress developing a ~~_quick and 
dirty_~~ **fast** solution for this... will update this ticket today or 
tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-24 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331194#comment-17331194
 ] 

Jose Armando Garcia Sancio commented on KAFKA-10800:


{quote}I think it's easy to understand 3, but I'm curious why would we need to 
do 1 and 2? I guess there should be some benefit or restriction which I'm not 
realized? Thanks!{quote}

While operating this feature it may be possible that users may have the content 
of the snapshot on disk but not the original file name. In that case we may 
want to identify the snapshot id for that file. We can do that if we have the 
`endOffset - 1` as the baseOffset of every batch and the `epoch` on every 
batch. Actually, this may not be easy to do with the current code since we use 
`BatchAccumulator`. I am okay doing this as part of another PR.

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] AndroideRob opened a new pull request #10591: Fix minor bugs in the existing documentation

2021-04-24 Thread GitBox


AndroideRob opened a new pull request #10591:
URL: https://github.com/apache/kafka/pull/10591


   Just finished reading the official docs, found some inconsistencies, here 
they are
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10592: MINOR: Remove redudant test files and close LogSegment after test

2021-04-24 Thread GitBox


dengziming commented on pull request #10592:
URL: https://github.com/apache/kafka/pull/10592#issuecomment-826089682


   Hello, @kowshik PTAL, also cc @junrao .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-24 Thread Haoran Xuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331232#comment-17331232
 ] 

Haoran Xuan commented on KAFKA-10800:
-

I got you~

And yeah, I was also thinking that we might need to make some change to the 
BatchAccumulator to make this happen. And since this is independent of the 
Jira, I think we can cover it in another separate task and I'm happy to take 
it. 

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5761) Serializer API should support ByteBuffer

2021-04-24 Thread Kirill Rodionov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331240#comment-17331240
 ] 

Kirill Rodionov commented on KAFKA-5761:


please see https://github.com/apache/kafka/pull/10590

> Serializer API should support ByteBuffer
> 
>
> Key: KAFKA-5761
> URL: https://issues.apache.org/jira/browse/KAFKA-5761
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.11.0.0
>Reporter: Bhaskar Gollapudi
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: features, performance
>
> Consider the Serializer : Its main method is :
> byte[] serialize(String topic, T data);
> Producer applications create a implementation that takes in an instance (
> of T ) and convert that to a byte[]. This byte array is allocated a new for
> this message.This byte array then is handed over to Kafka Producer API
> internals that write the bytes to buffer/ network socket. When the next
> message arrives , the serializer instead of creating a new byte[] , should
> try to reuse the existing byte[] for the new message. This requires two
> things :
> 1. The process of handing off the bytes to the buffer/socket and reusing
> the byte[] must happen on the same thread.
> 2 There should be a way for marking the end of available bytes in the
> byte[].
> The first is reasonably simple to understand. If this does not happen , and
> without other necessary synchrinization , the byte[] get corrupted and so
> is the message written to buffer/socket.However , this requirement is easy
> to meet for a producer application , because it controls the threads on
> which the serializer is invoked.
> The second is where the problem lies with the current API. It does not
> allow a variable size of bytes to be read from a container. It is limited
> by the byte[]'s length. This forces the producer to
> 1 either create a new byte[] for a message that is bigger than the previous
> one.
> OR
> 2. Decide a max size and use a padding .
> Both are cumbersome and error prone, and may cause wasting of network
> bandwidth.
> Instead , if there is an Serializer with this method :
> ByteBuffer serialize(String topic, T data);
> This helps to implements a reusable bytes container for  clients to avoid
> allocations for each message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #10592: MINOR: Remove redudant test files and close LogSegment after test

2021-04-24 Thread GitBox


kowshik commented on a change in pull request #10592:
URL: https://github.com/apache/kafka/pull/10592#discussion_r619682506



##
File path: core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
##
@@ -35,6 +39,17 @@ class LogSegmentsTest {
 LogTestUtils.createSegment(offset, logDir, indexIntervalBytes, time)
   }
 
+  @BeforeEach
+  def setup(): Unit = {
+logDir = TestUtils.tempDir()
+  }
+
+  @AfterEach
+  def teardown(): Unit = {
+segmentsBuffer.foreach(_.close())

Review comment:
   The segments are already closed from inside `LogSegments#close()` and 
therefore can be skipped in the `tearDown()`. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegments.scala#L78

##
File path: core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
##
@@ -129,12 +149,13 @@ class LogSegmentsTest {
 assertEquals(Seq(seg3), segments.values(3, 4).toSeq)
 assertEquals(Seq(), segments.values(4, 4).toSeq)
 assertEquals(Seq(seg4), segments.values(4, 5).toSeq)
-

Review comment:
   Lets add a `segments.close()` call here.

##
File path: core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
##
@@ -129,12 +149,13 @@ class LogSegmentsTest {
 assertEquals(Seq(seg3), segments.values(3, 4).toSeq)
 assertEquals(Seq(), segments.values(4, 4).toSeq)
 assertEquals(Seq(seg4), segments.values(4, 5).toSeq)
-
   }
 
   @Test
   def testClosestMatchOperations(): Unit = {
 val segments = new LogSegments(topicPartition)

Review comment:
   Lets add a `segments.close()` call at the end of this function.

##
File path: core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
##
@@ -18,15 +18,19 @@ package kafka.log
 
 import java.io.File
 
+import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.mutable
 
 class LogSegmentsTest {
 
   val topicPartition = new TopicPartition("topic", 0)
   var logDir: File = _
+  val segmentsBuffer: mutable.ArrayBuffer[LogSegments] = 
mutable.ArrayBuffer[LogSegments]()

Review comment:
   This buffer can be removed, as I described below by calling 
`LogSegments#close()` inside each test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10592: MINOR: Remove redudant test files and close LogSegment after test

2021-04-24 Thread GitBox


kowshik commented on a change in pull request #10592:
URL: https://github.com/apache/kafka/pull/10592#discussion_r619682506



##
File path: core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
##
@@ -35,6 +39,17 @@ class LogSegmentsTest {
 LogTestUtils.createSegment(offset, logDir, indexIntervalBytes, time)
   }
 
+  @BeforeEach
+  def setup(): Unit = {
+logDir = TestUtils.tempDir()
+  }
+
+  @AfterEach
+  def teardown(): Unit = {
+segmentsBuffer.foreach(_.close())

Review comment:
   The segments are already closed from inside `LogSegments#close()` and 
therefore closing these can be skipped in the `tearDown()`. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegments.scala#L78




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12713) Report broker/consumer fetch latency more accurately

2021-04-24 Thread Ming Liu (Jira)
Ming Liu created KAFKA-12713:


 Summary: Report broker/consumer fetch latency more accurately
 Key: KAFKA-12713
 URL: https://issues.apache.org/jira/browse/KAFKA-12713
 Project: Kafka
  Issue Type: Bug
Reporter: Ming Liu


The fetch latency is an important metrics to monitor for the cluster 
performance. With ACK=ALL, the produce latency is affected primarily by broker 
fetch latency.

However, currently the reported fetch latency didn't reflect the true fetch 
latency because it sometimes need to stay in purgatory and wait for 
replica.fetch.wait.max.ms when data is not available. This greatly affect the 
real P50, P99 etc. 



I like to propose a KIP to be able track the real fetch latency for both broker 
follower and consumer. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-24 Thread Ming Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-12713:
-
Summary: Report "REAL" broker/consumer fetch latency  (was: Report 
broker/consumer fetch latency more accurately)

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bruto1 commented on pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used

2021-04-24 Thread GitBox


bruto1 commented on pull request #10590:
URL: https://github.com/apache/kafka/pull/10590#issuecomment-826126350


   I guess the failing tests are par for the course here
   pls review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bruto1 edited a comment on pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used

2021-04-24 Thread GitBox


bruto1 edited a comment on pull request #10590:
URL: https://github.com/apache/kafka/pull/10590#issuecomment-826126350


   I guess the failing tests are par for the course here
   pls review! @chia7712 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-24 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331311#comment-17331311
 ] 

Ismael Juma commented on KAFKA-12713:
-

It would definitely be useful to have a better fetch latency metric. It's not 
obvious how it should work, however. Do you have suggestions?

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-04-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331319#comment-17331319
 ] 

Matthias J. Sax commented on KAFKA-10493:
-

{quote}Without KIP-280, it would always be an issue
{quote}
Well, yes and no. Without KIP-280 dropping out-of-order records in combination 
with source-topic optimization is the only problematic case.
{quote}then yes we are still in bad state, but at least we are not making 
things worse.
{quote}
If shipping this ticket would make it worse or not seems to be the key 
question...

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #10592: MINOR: Remove redudant test files and close LogSegment after test

2021-04-24 Thread GitBox


dengziming commented on pull request #10592:
URL: https://github.com/apache/kafka/pull/10592#issuecomment-826196454


   @kowshik , Thank you for your suggestions, I have rewritten the code 
according to your comments, PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 opened a new pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-24 Thread GitBox


feyman2016 opened a new pull request #10593:
URL: https://github.com/apache/kafka/pull/10593


   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 closed pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-24 Thread GitBox


feyman2016 closed pull request #10593:
URL: https://github.com/apache/kafka/pull/10593


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation

2021-04-24 Thread Patrick O'Keeffe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331366#comment-17331366
 ] 

Patrick O'Keeffe commented on KAFKA-12453:
--

Thanks for the explanation [~mjsax]

Agree it makes sense to create a new page for topology optimisation, I could 
have done with similar guidance myself!

A couple of questions:
 # There is an FAQ for topology optimisation on the Confluent site, which goes 
further than the apache kafka docs, but still not far enough. However, it does 
provide a good starting point - do I need to be careful to avoid similarities 
with the Confluent text, e.g. writing the page as an FAQ, maybe having 
similarly named sections ...?
 # Has been a while since I've written any HTML, any advice on testing the that 
the page is formatted correctly and will render?

> Guidance on whether a topology is eligible for optimisation
> ---
>
> Key: KAFKA-12453
> URL: https://issues.apache.org/jira/browse/KAFKA-12453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick O'Keeffe
>Priority: Major
>
> Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision 
> about whether a topology is eligible for optimisation is no longer a simple 
> one, and is related to whether toTable() operations are preceded by key 
> changing operators.
> This decision requires expert level knowledge, and there are serious 
> implications associated with getting it wrong in terms of fault tolerance
> Some ideas spring to mind around how to guide developers to make the correct 
> decision:
>  # Topology.describe() could indicate whether this topology is eligible for 
> optimisation
>  # Topologies could be automatically optimised - note this may have an impact 
> at deployment time, in that an application reset may be required. The 
> developer would need to made aware of this and adjust the deployment plan 
> accordingly
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)