[GitHub] [kafka] showuon merged pull request #11880: MINOR: Fix comments in TransactionsTest

2022-03-10 Thread GitBox


showuon merged pull request #11880:
URL: https://github.com/apache/kafka/pull/11880


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11881: MINOR: revert back to 60s session timeout for static membership roll bounce test

2022-03-10 Thread GitBox


showuon opened a new pull request #11881:
URL: https://github.com/apache/kafka/pull/11881


   In this PR: 
https://github.com/apache/kafka/pull/11236/files#diff-bd4be654a82d362772b2010a0fa22a44916ff0da241ca7e6072741f7ef710136L689-R696
 , we tried to reduce the session timeout for running tests faster. However, we 
accidentally overrode the session timeout in static membership tests, where we 
expected to set to a longer session timeout to test the static member won't 
trigger rebalance. Revert it back to 60 seconds.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ddrid opened a new pull request #11880: MINOR: Fix comments in TransactionsTest

2022-03-10 Thread GitBox


ddrid opened a new pull request #11880:
URL: https://github.com/apache/kafka/pull/11880


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik closed pull request #11815: MINOR: Eliminate FileRecords mutable flag

2022-03-10 Thread GitBox


kowshik closed pull request #11815:
URL: https://github.com/apache/kafka/pull/11815


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] XiaoyiPeng opened a new pull request #11879: KAFKA-13728: fix PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.

2022-03-10 Thread GitBox


XiaoyiPeng opened a new pull request #11879:
URL: https://github.com/apache/kafka/pull/11879


   The class `PushHttpMetricsReporter` no longer pushes metrics when network 
failure is recovered.
   
   I debugged the code and found the problem here :
   
https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221
   

   
   When we submit a task to the `ScheduledThreadPoolExecutor` that needs to be 
executed periodically, if the task throws an exception and is not swallowed, 
the task will no longer be scheduled to execute.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13728) PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.

2022-03-10 Thread XiaoyiPeng (Jira)
XiaoyiPeng created KAFKA-13728:
--

 Summary: PushHttpMetricsReporter no longer pushes metrics when 
network failure is recovered.
 Key: KAFKA-13728
 URL: https://issues.apache.org/jira/browse/KAFKA-13728
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.1.0
Reporter: XiaoyiPeng


The class *PushHttpMetricsReporter* no longer pushes metrics when network 
failure is recovered.

I debugged the code and found the problem here :
[https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221]

 

When we submit a task to the *ScheduledThreadPoolExecutor* that needs to be 
executed periodically, if the task throws an exception and is not swallowed, 
the task will no longer be scheduled to execute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13727) Edge case in cleaner can result in premature removal of ABORT marker

2022-03-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13727:

Description: 
The log cleaner works by first building a map of the active keys beginning from 
the dirty offset, and then scanning forward from the beginning of the log to 
decide which records should be retained based on whether they are included in 
the map. The map of keys has a limited size. As soon as it fills up, we stop 
building it. The offset corresponding to the last record that was included in 
the map becomes the next dirty offset. Then when we are cleaning, we stop 
scanning forward at the dirty offset. Or to be more precise, we continue 
scanning until the end of the segment which includes the dirty offset, but all 
records above that offset are coped as is without checking the map of active 
keys. 

Compaction is complicated by the presence of transactions. The cleaner must 
keep track of which transactions have data remaining so that it can tell when 
it is safe to remove the respective markers. It works a bit like the consumer. 
Before scanning a segment, the cleaner consults the aborted transaction index 
to figure out which transactions have been aborted. All other transactions are 
considered committed.

The problem we have found is that the cleaner does not take into account the 
range of offsets between the dirty offset and the end offset of the segment 
containing it when querying ahead for aborted transactions. This means that 
when the cleaner is scanning forward from the dirty offset, it does not have 
the complete set of aborted transactions. The main consequence of this is that 
abort markers associated with transactions which start within this range of 
offsets become eligible for deletion even before the corresponding data has 
been removed from the log.

Here is an example. Suppose that the log contains the following entries:

offset=0, key=a

offset=1, key=b

offset=2, COMMIT

offset=3, key=c

offset=4, key=d

offset=5, COMMIT

offset=6, key=b

offset=7, ABORT

Suppose we have an offset map which can only contain 2 keys and the dirty 
offset starts at 0. The first time we scan forward, we will build a map with 
keys a and b, which will allow us to move the dirty offset up to 3. Due to the 
issue documented here, we will not detect the aborted transaction starting at 
offset 6. But it will not be eligible for deletion on this round of cleaning 
because it is bound by `delete.retention.ms`. Instead, our new logic will set 
the deletion horizon for this batch based to the current time plus the 
configured `delete.retention.ms`.

offset=0, key=a

offset=1, key=b

offset=2, COMMIT

offset=3, key=c

offset=4, key=d

offset=5, COMMIT

offset=6, key=b

offset=7, ABORT (deleteHorizon: N)

Suppose that the time reaches N+1 before the next cleaning. We will begin from 
the dirty offset of 3 and collect keys c and d before stopping at offset 6. 
Again, we will not detect the aborted transaction beginning at offset 6 since 
it is out of the range. This time when we scan, the marker at offset 7 will be 
deleted because the transaction will be seen as empty and now the deletion 
horizon has passed. So we end up with this state:

offset=0, key=a

offset=1, key=b

offset=2, COMMIT

offset=3, key=c

offset=4, key=d

offset=5, COMMIT

offset=6, key=b

Effectively it becomes a hanging transaction. The interesting thing is that we 
might not even detect it. As far as the leader is concerned, it had already 
completed that transaction, so it is not expecting any additional markers. The 
transaction index would have been rewritten without the aborted transaction 
when the log was cleaned, so any consumer fetching the data would see the 
transaction as committed. On the other hand, if we did a reassignment to a new 
replica, or if we had to rebuild the full log state during recovery, then we 
would suddenly detect it.

I am not sure how likely this scenario is in practice. I think it's fair to say 
it is an extremely rare case. The cleaner has to fail to clean a full segment 
at least two times and you still need enough time to pass for the marker's 
deletion horizon to be reached. Perhaps it is possible if the cardinality of 
keys is very high and the configured memory limit for the cleaner is low.

  was:
The log cleaner works by first building a map of the active keys beginning from 
the dirty offset, and then scanning forward from the beginning of the log to 
decide which records should be retained based on whether they are included in 
the map. The map of keys has a limited size. As soon as it fills up, we stop 
building it. The offset corresponding to the last record that was included in 
the map becomes the next dirty offset. Then when we are cleaning, we stop 
scanning forward at the dirty offset. Or to be more precise, we continue 
scanning until the end of the segment which includes 

[jira] [Created] (KAFKA-13727) Edge case in cleaner can result in premature removal of ABORT marker

2022-03-10 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13727:
---

 Summary: Edge case in cleaner can result in premature removal of 
ABORT marker
 Key: KAFKA-13727
 URL: https://issues.apache.org/jira/browse/KAFKA-13727
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


The log cleaner works by first building a map of the active keys beginning from 
the dirty offset, and then scanning forward from the beginning of the log to 
decide which records should be retained based on whether they are included in 
the map. The map of keys has a limited size. As soon as it fills up, we stop 
building it. The offset corresponding to the last record that was included in 
the map becomes the next dirty offset. Then when we are cleaning, we stop 
scanning forward at the dirty offset. Or to be more precise, we continue 
scanning until the end of the segment which includes the dirty offset, but all 
records above that offset are coped as is without checking the map of active 
keys. 

Compaction is complicated by the presence of transactions. The cleaner must 
keep track of which transactions have data remaining so that it can tell when 
it is safe to remove the respective markers. It works a bit like the consumer. 
Before scanning a segment, the cleaner consults the aborted transaction index 
to figure out which transactions have been aborted. All other transactions are 
considered committed.

The problem we have found is that the cleaner does not take into account the 
range of offsets between the dirty offset and the end offset of the segment 
containing it when querying ahead for aborted transactions. This means that 
when the cleaner is scanning forward from the dirty offset, it does not have 
the complete set of aborted transactions. The main consequence of this is that 
abort markers associated with transactions which start within this range of 
offsets become eligible for deletion even before the corresponding data has 
been removed from the log.

Here is an example. Suppose that the log contains the following entries:

offset=0, key=1

offset=1, key=2

offset=2, COMMIT

offset=3, key=3

offset=4, key=4

offset=5, COMMIT

offset=6, key=2

offset=7, ABORT

Suppose we have an offset map which can only contain 2 keys and the dirty 
offset starts at 0. The first time we scan forward, we will build a map with 
keys 1 and 2, which will allow us to move the dirty offset up to 3. Due to the 
issue documented here, we will not detect the aborted transaction starting at 
offset 6. But it will not be eligible for deletion on this round of cleaning 
because it is bound by `delete.retention.ms`. Instead, our new logic will set 
the deletion horizon for this batch based to the current time plus the 
configured `delete.retention.ms`.

offset=0, key=a

offset=1, key=b

offset=2, COMMIT

offset=3, key=c

offset=4, key=d

offset=5, COMMIT

offset=6, key=b

offset=7, ABORT (deleteHorizon: N)

Suppose that the time reaches N+1 before the next cleaning. We will begin from 
the dirty offset of 3 and collect keys c and d before stopping at offset 6. 
Again, we will not detect the aborted transaction beginning at offset 6 since 
it is out of the range. This time when we scan, the marker at offset 7 will be 
deleted because the transaction will be seen as empty and now the deletion 
horizon has passed. So we end up with this state:

offset=0, key=a

offset=1, key=b

offset=2, COMMIT

offset=3, key=c

offset=4, key=d

offset=5, COMMIT

offset=6, key=b

Effectively it becomes a hanging transaction. The interesting thing is that we 
might not even detect it. As far as the leader is concerned, it had already 
completed that transaction, so it is not expecting any additional markers. The 
transaction index would have been rewritten without the aborted transaction 
when the log was cleaned, so any consumer fetching the data would see the 
transaction as committed. On the other hand, if we did a reassignment to a new 
replica, or if we had to rebuild the full log state during recovery, then we 
would suddenly detect it.

I am not sure how likely this scenario is in practice. I think it's fair to say 
it is an extremely rare case. The cleaner has to fail to clean a full segment 
at least two times and you still need enough time to pass for the marker's 
deletion horizon to be reached. Perhaps it is possible if the cardinality of 
keys is very high and the configured memory limit for the cleaner is low.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] wcarlson5 commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-10 Thread GitBox


wcarlson5 commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064636537


   @guozhangwang yes that would be a concern.
   
   I think we need to document that these calls need to be made in the same 
order with the same topologies for each client. That is what KSQL does to make 
sure it works


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-10 Thread GitBox


guozhangwang commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064634922


   @wcarlson5 @ableegoldman I'm wondering if we could still have live lock 
scenarios like this: say we have two topologies A and B, and two threads a and 
b each on a different KS instance. And each thread tries to remove one topology 
at a time, getting the future to make sure it is cleaned up, BUT they did it in 
a different order.
   
   1) Thread a calls `remove(A)` and gets a futureA.
   2) Thread b calls `remove(B)` and gets a futureB.
   3) Thread a calls `futureA. get()` trying to delete offsets, and after that 
it tries to `remove(B)`, but gets blocked on topology A not removed by thread b 
yet.
   4) Thread b calls `futureB. get()` trying to delete offsets, and after that 
it tries to `remove(A)`, but gets blocked on topology B not removed by thread a 
yet.
   
   Would that be a concern?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky opened a new pull request #11878: fix flaky EosIntegrationTest.shouldCommitCorrectOffsetIfInputTopicIsTransactional[at_least_once]

2022-03-10 Thread GitBox


lihaosky opened a new pull request #11878:
URL: https://github.com/apache/kafka/pull/11878


   In this test, we started Kafka Streams app and then write to input topic in 
transaction. It's possible when streams commit offset, transaction hasn't 
finished yet. So the streams committed offset could be less than the eventual 
`endOffset`.
   
   This PR moves the logic of writing to input topic before starting streams 
app.
   
   *Summary of testing strategy (including rationale)*
   Run test 30 times and didn't see failure. Without this change, it always 
failed when running 20 times.
   
   for (( c=1; c<=30; c++ )); do ./gradlew cleanTest streams:test 
-PshowStandardStreams=true --tests 
EosIntegrationTest.shouldCommitCorrectOffsetIfInputTopicIsTransactional[at_least_once]
 >>  test_output 2>&1; done
   
   Output: https://gist.github.com/lihaosky/ab0661a5e453d2e0970dece6a29641f2
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-10 Thread GitBox


bbejeck commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064613347


   Merged #11870 to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-10 Thread GitBox


bbejeck merged pull request #11870:
URL: https://github.com/apache/kafka/pull/11870


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-10 Thread GitBox


bbejeck commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064609464


   test failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-10 Thread GitBox


ijuma commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064595311


   Go for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-10 Thread GitBox


bbejeck commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1064549485


   @ijuma I was going to merge this do you have any objections or comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ableegoldman commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064538600


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ableegoldman merged pull request #11873:
URL: https://github.com/apache/kafka/pull/11873


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ijuma commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064536922


   Thanks for the details @wcarlson5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ableegoldman commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064522689


   Test failures are unrelated. Going to merge
   
   Thanks for digging into this @wcarlson5  and @lihaosky . I hope we can 
figure out the cause and fix it soon as I was looking forward to having the 
rebalance reason be available in the JoinGroup :/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504584#comment-17504584
 ] 

Randall Hauch edited comment on KAFKA-12879 at 3/10/22, 9:22 PM:
-

Update: I'm having trouble with backporting the Connect changes to the 2.5 
branch. Because this branch is so old, I'm going to just revert the AdminClient 
behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes.

Note: the break due to KAFKA-12339 was never released in the 2.5 branch.


was (Author: rhauch):
Update: I'm having trouble with backporting the Connect changes to the 2.5 
branch. Because this branch is so old, I'm going to just revert the AdminClient 
behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
> Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-12879.
---
  Reviewer: Randall Hauch
Resolution: Fixed

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
> Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12879:
--
Fix Version/s: 3.2.0

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
> Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12879:
--
Fix Version/s: 3.0.2
   2.7.3
   2.6.4
   2.5.2
   2.8.2
   3.1.1

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
> Fix For: 2.5.2, 2.8.2, 3.1.1, 3.0.2, 2.7.3, 2.6.4
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504584#comment-17504584
 ] 

Randall Hauch commented on KAFKA-12879:
---

Update: I'm having trouble with backporting the Connect changes to the 2.5 
branch. Because this branch is so old, I'm going to just revert the AdminClient 
behavior to the 2.5 branch, and _NOT_ backport the Connect retry changes.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503891#comment-17503891
 ] 

Randall Hauch edited comment on KAFKA-12879 at 3/10/22, 9:13 PM:
-

The approach we decided to take was to revert the previous admin client changes 
from KAFKA-12339 to bring the admin client behavior back to previous 
expectations, and to implement retries within the KafkaBasedLog to handle cases 
like those identified in that issue.

For example, a likely root cause of KAFKA-12339 was a Connect worker 
instantiates its KafkaConfigBackingStore (and other internal topic stores), 
which creates a KafkaBasedLog that as part of start() creates the topic if it 
doesn't exist and then immediately tries to read the offsets. That reading of 
offsets can fail if the metadata for the newly created topic hasn't been 
propagated to all of the brokers. We can solve this particular root cause 
easily by retrying the reading of offsets within the KafkaBasedLog's start() 
method, and since topic metadata should be propagated relatively quickly, we 
don't need to retry for that long – and most of the time we'd probably 
successfully retry within a few retries.

I've just merged to trunk a PR that does this. When trying to backport this, 
some of the newer tests were flaky, so [~pnee] created another PR (plus 
another) to hopefully eliminate that flakiness, and it seemed to work. 

I'm in the process of backporting this all the way back to 2.6 -2.5- branch, 
since that's how far back the regression from KAFKA-12339 was backported.


was (Author: rhauch):
The approach we decided to take was to revert the previous admin client changes 
from KAFKA-12339 to bring the admin client behavior back to previous 
expectations, and to implement retries within the KafkaBasedLog to handle cases 
like those identified in that issue.

For example, a likely root cause of KAFKA-12339 was a Connect worker 
instantiates its KafkaConfigBackingStore (and other internal topic stores), 
which creates a KafkaBasedLog that as part of start() creates the topic if it 
doesn't exist and then immediately tries to read the offsets. That reading of 
offsets can fail if the metadata for the newly created topic hasn't been 
propagated to all of the brokers. We can solve this particular root cause 
easily by retrying the reading of offsets within the KafkaBasedLog's start() 
method, and since topic metadata should be propagated relatively quickly, we 
don't need to retry for that long – and most of the time we'd probably 
successfully retry within a few retries.

I've just merged to trunk a PR that does this. When trying to backport this, 
some of the newer tests were flaky, so [~pnee] created another PR (plus 
another) to hopefully eliminate that flakiness, and it seemed to work. 

I'm in the process of backporting this all the way back to 2.5 branch, since 
that's how far back the regression from KAFKA-12339 was backported.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

2022-03-10 Thread GitBox


C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,46 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ *
+ * For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+ * strongly encouraged to override this method to return a non-null value 
such as
+ * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+ *
+ * Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+ * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.

Review comment:
   Oooh, that's an interesting idea.
   
   I think your point about the connector doing extra work is my biggest 
concern on that front, and it seems to be validated (heh) by how `validate` is 
designed (we never actually invoke `validate` on the same `Connector` instances 
that we invoke `start` on at the moment). It may not be very frequent, but 
there certainly are some connectors out there that do some heavy lifting in 
`start` that we wouldn't want to do every time during preflight validation. For 
a practical example, see MirrorMaker2: 
https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128
   
   Adding a `configure` method would help with this issue by allowing us to 
give a long-lived config to a `Connector` that the connector can hold onto 
across successive invocations of `exactlyOnceSupport` and 
`canDefineTransactionBoundaries` (and possibly even `validate`). But, it would 
also complicate some of the existing preflight validation logic in the 
framework. Right now, we only create a single `Connector` instance per 
connector type per worker for preflight validation, and all invocations of 
`validate` and `config` are performed with that instance (see `AbstractHerder` 
[here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431),
 
[here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666),
 and [here](https://github.com/apache/kafka/blob/38e3787d760
 
fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)).
   
   Since we use these validation-only `Connector` instances pretty loosely and 
without any guarantees about order of invocations for `config` and `validate`, 
it's unlikely that there are connectors out there that store state in either of 
these methods, so there could be relatively low risk for creating a new 
validation-only `Connector` instance every time a config has to be validated.
   
   But I have to wonder if we're overthinking things? It seems like we're 
trying to optimize away from methods that accept a raw config map and instead 
only provide that config once per round of validation. Is the objective there 
to create a smoother/better-documented/more-strictly-defined API? Improve 
resource utilization by obviating the need of connector developers to construct 
`AbstractConfig` subclasses (or equivalents) which parse and validate 
individual properties at instantiation time?
   
   As an aside--I think the language in the Javadoc may need to be revisited 
since, as you note, it implies that these new methods will be invoked before 
`start`, which is not necessarily the case (for example, in the current 
preflight validation PR, use of these methods is mutually exclusive with the 
`start` method for a given `Connector` instance). **TODO: Rework Javadoc once 
exact behavior is agreed upon.**




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

2022-03-10 Thread GitBox


C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,46 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ *
+ * For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+ * strongly encouraged to override this method to return a non-null value 
such as
+ * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+ *
+ * Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+ * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.

Review comment:
   Oooh, that's an interesting idea.
   
   I think your point about the connector doing extra work is my biggest 
concern on that front, and it seems to be validated (heh) by how `validate` is 
designed (we never actually invoke `validate` on the same `Connector` instances 
that we invoke `start` on at the moment). It may not be very frequent, but 
there certainly are some connectors out there that do some heavy lifting in 
`start` that we wouldn't want to do every time during preflight validation. For 
a practical example, see MirrorMaker2: 
https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128
   
   Adding a `configure` method would help with this issue by allowing us to 
give a long-lived config to a `Connector` that the connector can hold onto 
across successive invocations of `exactlyOnceSupport` and 
`canDefineTransactionBoundaries` (and possibly even `validate`). But, it would 
also complicate some of the existing preflight validation logic in the 
framework. Right now, we only create a single `Connector` instance per 
connector type per worker for preflight validation, and all invocations of 
`validate` and `config` are performed with that instance (see `AbstractHerder` 
[here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431),
 
[here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666),
 and [here](https://github.com/apache/kafka/blob/38e3787d760
 
fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)).
   
   Since we use these validation-only `Connector` instances pretty loosely and 
without any guarantees about order of invocations for `config` and `validate`, 
it's unlikely that there are connectors out there that store state in either of 
these methods, so there could be relatively low risk for creating a new 
validation-only `Connector` instance every time a config has to be validated.
   
   But I have to wonder if we're overthinking things? It seems like we're 
trying to optimize away from methods that accept a raw config map and instead 
only provide that config once per round of validation. Is the objective there 
to create a smoother/better-documented/more-strictly-defined API? Improve 
resource utilization by obviating the need of connector developers to construct 
`AbstractConfig` subclasses (or equivalents) which parse and validate 
individual properties at instantiation time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #11815: MINOR: Eliminate FileRecords mutable flag

2022-03-10 Thread GitBox


junrao commented on pull request #11815:
URL: https://github.com/apache/kafka/pull/11815#issuecomment-1064456133


   @kowshik : Thanks for the investigation. I agree that this does seem to 
justify keeping the mutable flag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13699:
---

Assignee: Matthias J. Sax

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Shay Lin
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: newbie
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mjsax opened a new pull request #11877: KAKFA-13699: new ProcessorContext is missing methods

2022-03-10 Thread GitBox


mjsax opened a new pull request #11877:
URL: https://github.com/apache/kafka/pull/11877


   In added `currentSystemTimeMs()` and `currentStreamTimeMs()` to the
   `ProcessorContext` via KIP-622, but forgot to add both to the new
   `api.ProcessorContext`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-10 Thread GitBox


ableegoldman commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064453252


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-10 Thread GitBox


ableegoldman merged pull request #11868:
URL: https://github.com/apache/kafka/pull/11868


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-10 Thread GitBox


ableegoldman commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1064452750


   Test failures are unrelated. Merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"

2022-03-10 Thread GitBox


bbejeck commented on pull request #11876:
URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064398452


   Thanks for the contribution @aSemy!  
   
   Could you file an identical PR for 
https://github.com/apache/kafka-site/blob/asf-site/30/streams/developer-guide/dsl-api.html
  and 
https://github.com/apache/kafka-site/blob/asf-site/31/streams/developer-guide/dsl-api.html
 (both in the same PR)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"

2022-03-10 Thread GitBox


bbejeck commented on pull request #11876:
URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064394560


   merged into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #11876: Minor typo: "result is a" > "result in a"

2022-03-10 Thread GitBox


bbejeck merged pull request #11876:
URL: https://github.com/apache/kafka/pull/11876


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11876: Minor typo: "result is a" > "result in a"

2022-03-10 Thread GitBox


bbejeck commented on pull request #11876:
URL: https://github.com/apache/kafka/pull/11876#issuecomment-1064390638


   PR is html only, so merging now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aSemy opened a new pull request #11876: Minor typo: "result _is_ a" > "result _in_ a"

2022-03-10 Thread GitBox


aSemy opened a new pull request #11876:
URL: https://github.com/apache/kafka/pull/11876


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-03-10 Thread GitBox


mimaison commented on pull request #11838:
URL: https://github.com/apache/kafka/pull/11838#issuecomment-1064341677


   Thanks for the PR @blcksrx ! I'll try to take a look in the next few days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

2022-03-10 Thread GitBox


mimaison commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824001349



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,46 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ *
+ * For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are
+ * strongly encouraged to override this method to return a non-null value 
such as
+ * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link 
ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+ *
+ * Similar to {@link #validate(Map) validate}, this method may be 
called by the runtime before the
+ * {@link #start(Map) start} method is invoked when the connector will be 
run with exactly-once support.

Review comment:
   Potentially controversial idea: Have you considered always calling 
`start()` first? That way the connector should have computed its configuration 
and should be able to easily handle `exactlyOnceSupport()` and 
`canDefineTransactionBoundaries()`. Obviously `start()` may cause the connector 
to do some work before the runtime stops it in case of an invalid 
configuration. But since the proposed javadoc hinted it could be called first 
anyway, it's not making first worst
   
   The configuration validation logic is getting more and more complex and 
showing the limits of the current APIs. I wonder if having a `configure(Map)` 
method would help us. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 closed pull request #11866: MINOR: Offset Result is separate

2022-03-10 Thread GitBox


wcarlson5 closed pull request #11866:
URL: https://github.com/apache/kafka/pull/11866


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


wcarlson5 edited a comment on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064230020


   @dajac @ijuma Hey I expanded the PR description a bit to include everything 
we know. I also was not sure how this causes a performance impact. It seems 
innocuous but i has a clear impact on our benchmarks. @lihaosky Is looking to 
find out more. It looks like a buffer in the network client was overflowing 
maybe?
   
   
   Commit 2ccc834faa (the original)
   
![image](https://user-images.githubusercontent.com/18128741/157704574-d51168cf-7cfd-4103-8e59-50609b9a4049.png)
   
   Commit 59ca166d (the revert)
   
![image](https://user-images.githubusercontent.com/18128741/157704295-df73de06-2e4d-4a94-bc98-26bb0b8d955f.png)
   
   Let me know if there is anything else you would like to know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


wcarlson5 commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064230020


   @dajac @ijuma Hey I expanded the PR description a bit to include everything 
we know. I also was not sure how this causes a performance impact. It seems 
innocuous but i has a clear impact on our benchmarks. @lihaosky Is looking to 
find out more.
   
   
   Commit 2ccc834faa (the original)
   
![image](https://user-images.githubusercontent.com/18128741/157704574-d51168cf-7cfd-4103-8e59-50609b9a4049.png)
   
   Commit 59ca166d (the revert)
   
![image](https://user-images.githubusercontent.com/18128741/157704295-df73de06-2e4d-4a94-bc98-26bb0b8d955f.png)
   
   Let me know if there is anything else you would like to know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ijuma commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064164621


   Is this method being called in a hot loop? One difference is that there is a 
string concatenation if the reason is not null.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ijuma commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064157672


   We should not revert this without providing more detailed information, the 
PR doesn't include enough information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


ijuma commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1064156544


   Have you verified that this revert actually fixes the regression?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


dajac commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063998412


   @wcarlson5 It would be great if we could get to the bottom on this. I don't 
really understand how that could impact the performances.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-10 Thread GitBox


cadonna commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063957270


   Restarted builds


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #11815: MINOR: Eliminate FileRecords mutable flag

2022-03-10 Thread GitBox


kowshik edited a comment on pull request #11815:
URL: https://github.com/apache/kafka/pull/11815#issuecomment-1063904350


   @junrao Thanks for catching. I found the test failures to be valid. Thinking 
about it agian, now I'm not sure if it is feasible to proceed with eliminating 
the `mutable` flag, because it appears to be used actively. For ex: there is 
code in `FileRawSnapshotWriter` which sets the file to read-only mode, ex: 
https://github.com/apache/kafka/blob/0f00f36/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java#L113.
 As a result, `FileRawSnapshotReader` can open the file only in read mode, and 
therefore it needs to pass `mutable=false` to `FileRecords.open()`. Most of the 
test failures are due to this.
   
   Please let me know if I'm missing something, or if there is an alternate 
approach we could take here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #11815: MINOR: Eliminate FileRecords mutable flag

2022-03-10 Thread GitBox


kowshik commented on pull request #11815:
URL: https://github.com/apache/kafka/pull/11815#issuecomment-1063904350


   @junrao Thanks for catching. I found the test failures to be valid. Thinking 
about it agian, now I'm not sure if it is feasible to proceed with eliminating 
the `mutable` flag, because it appears to be used actively. For ex: there is 
code in `FileRawSnapshotWriter` which sets the file to read-only mode, ex: 
https://github.com/apache/kafka/blob/0f00f36/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java#L113.
 As a result, `FileRawSnapshotReader` can open the file only in read mode, and 
therefore it needs to pass `mutable=false` to `FileRecords.open()`. Most of the 
test failures are due to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2022-03-10 Thread GitBox


cadonna commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063898824


   @lkokhreidze The system tests passed:
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-03-10--001.system-test-kafka-branch-builder--1646904256--lkokhreidze--KAFKA-6718-part1-subscription-info-changes--3e73e5b962/report.html
   
   @showuon Could you merge this PR after your review? (Of course, only if the 
PR looks good to you)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11875: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results

2022-03-10 Thread GitBox


mjsax commented on a change in pull request #11875:
URL: https://github.com/apache/kafka/pull/11875#discussion_r823500646



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -211,7 +214,7 @@ private void emitNonJoinedOuterRecords(final 
KeyValueStore= sharedTimeTracker.streamTime - 
joinAfterMs - joinGraceMs) {
+if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - 
joinSpuriousLookBackTimeMs - joinGraceMs) {

Review comment:
   This is the actual fix.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -1197,7 +1194,7 @@ public void testAsymmetricWindowingAfter() {
 joined = stream1.join(
 stream2,
 MockValueJoiner.TOSTRING_JOINER,
-JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), 
ofMillis(0)).after(ofMillis(100)),
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).after(ofMillis(100)),

Review comment:
   Minor side cleanup

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##
@@ -53,9 +56,14 @@
 private final String topic1 = "topic1";
 private final String topic2 = "topic2";
 private final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
-private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
-@SuppressWarnings("deprecation")
+@BeforeClass
+public static void beforeClass() {
+
PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
 0L);

Review comment:
   Side fix: we need to ensure that we never delay outputting left/outer 
join result due to wall-clock time not advancing using TTD

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##
@@ -749,6 +757,70 @@ public void testWindowing() {
 }
 }
 
+@Test
+public void shouldNotEmitLeftJoinResultForAsymmetricWindow() {
+final StreamsBuilder builder = new StreamsBuilder();
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockProcessorSupplier supplier = new 
MockProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockProcessor processor = 
supplier.theCapturedProcessor();
+long time = 0L;
+
+// push two items to the primary stream; the other window is 
empty; this should not produce any items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+// --> w2 = {}
+for (int i = 0; i < 2; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult();
+
+// push one item to the other stream; this should produce one 
full-join item
+// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+// --> w2 = { 0:a0 (ts: 100) }
+time += 100L;
+inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], 
time);
+
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+a0", 100L)

Review comment:
   Without the fix, we get a spurious left-join result here.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##
@@ -877,7 +949,6 @@ private void testUpperWindowBound(final int[] expectedKeys,
 
 // push a dummy record to produce all left-join non-joined items
 time += 301L;
-driver.advanceWallClockTime(Duration.ofMillis(1000L));

Review 

[GitHub] [kafka] mjsax opened a new pull request #11875: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results

2022-03-10 Thread GitBox


mjsax opened a new pull request #11875:
URL: https://github.com/apache/kafka/pull/11875


   Call for review @spena @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13721) Left-join still emit spurious results in stream-stream joins in some cases

2022-03-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13721:
---

Assignee: Matthias J. Sax

> Left-join still emit spurious results in stream-stream joins in some cases
> --
>
> Key: KAFKA-13721
> URL: https://issues.apache.org/jira/browse/KAFKA-13721
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nollet
>Assignee: Matthias J. Sax
>Priority: Major
>
> Stream-stream joins seems to still emit spurious results for some window 
> configurations.
> From my tests, it happened when setting before to 0 and having a grace period 
> smaller than the window duration. More precisely it seems to happen when 
> setting before and 
> window duration > grace period + before
> h2. how to reproduce
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.TestInputTopic;
> import org.apache.kafka.streams.TestOutputTopic;
> import org.apache.kafka.streams.Topology;
> import org.apache.kafka.streams.TopologyTestDriver;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.KStream;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.time.Duration;
> import java.time.Instant;
> import java.util.Properties;
> public class SpuriousLeftJoinTest {
> static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
> static final Duration GRACE = Duration.ofMinutes(6);
> static final Duration BEFORE = Duration.ZERO;
> static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
> static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
> static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";
> private static TopologyTestDriver testDriver;
> private static TestInputTopic inputTopicLeft;
> private static TestInputTopic inputTopicRight;
> private static TestOutputTopic outputTopic;
> public static Topology createTopology() {
> StreamsBuilder builder = new StreamsBuilder();
> KStream leftStream = builder.stream(LEFT_TOPIC_NAME);
> KStream rightStream = 
> builder.stream(RIGHT_TOPIC_NAME);
> // return 1 if left join matched, otherwise 0
> KStream joined = leftStream.leftJoin(
> rightStream,
> (value1, value2) -> {
> if(value2 == null){
> return 0;
> }
> return 1;
> },
> JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
> .before(BEFORE)
> );
> joined.to(OUTPUT_TOPIC_NAME);
> return builder.build();
> }
> @Before
> public void setup() {
> Topology topology = createTopology();
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.StringSerde.class);
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.IntegerSerde.class);
> testDriver = new TopologyTestDriver(topology, props);
> inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
> inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
> outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, 
> Serdes.String().deserializer(), Serdes.Integer().deserializer());
> }
> @After
> public void tearDown() {
> testDriver.close();
> }
> @Test
> public void shouldEmitOnlyOneMessageForKey1(){
> Instant now = Instant.now();
> inputTopicLeft.pipeInput("key1", 12, now);
> inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));
> // send later record to increase stream time & close the window
> inputTopicLeft.pipeInput("other_key", 1212122, 
> now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));
> while (! outputTopic.isEmpty()){
> System.out.println(outputTopic.readKeyValue());
> }
> }
> }
> {code}
> Stdout of previous code is
> {noformat}
> KeyValue(key1, 0)
> KeyValue(key1, 1)
> {noformat}
> However it should be
> {noformat}
> KeyValue(key1, 1)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration

2022-03-10 Thread Harsha Madduri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504090#comment-17504090
 ] 

Harsha Madduri commented on KAFKA-10160:


Thanks for pointing that out, [~mimaison]. My bad, I was looking at an older 
commit 
([https://github.com/apache/kafka/pull/8921/commits/75b1c77542fe8269b89fb4f209520bb59e70ca3e])
 and expecting that to be in trunk. I just realized that the code was changed 
based on some comments. Thanks again!

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.4.2, 2.8.0
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration

2022-03-10 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504077#comment-17504077
 ] 

Mickael Maison commented on KAFKA-10160:


[~harsham] This fix is in trunk, see 
https://github.com/apache/kafka/commit/cf202cb6acf38c64a3e8b9e541673a12ee55eaaa

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.4.2, 2.8.0
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.20.1#820001)