[jira] [Resolved] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception

2022-09-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13840.
---
Resolution: Fixed

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -
>
> Key: KAFKA-13840
> URL: https://issues.apache.org/jira/browse/KAFKA-13840
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>Reporter: Kyle R Stehbens
>Assignee: Luke Chen
>Priority: Critical
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: KafkaConsumer refactor proposal

2022-09-15 Thread Guozhang Wang
Hello Philip,

Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
this as a 1-pager instead of a KIP is that so far all the implementations
should be internal, and hence not impacting anything on the public APIs.
If, e.g. we found along with the implementation that some public interfaces
like metrics, need to be modified, then we will send out a separate thread
for that in a KIP proposal, right?

I made a pass on the 1-pager and also some of the ongoing developments,
here are just some high-level comments:

On the 1-pager:

1) I think it's better to clarify the scenarios under manual assignment
a.k.a. `consumer.assign()`. E.g. does the background thread still always
tries to discover coordinator even if there's no commit requests (note
that, if yes, this would be a behavioral change since some users like
Connect may rely on the consumer to not ever try to discover the
coordinator with manual assignment and no commit to brokers)? From the
description it seems if there's no events from the channel to request
committing offsets, it would not try to discover coordinator, but then the
background thread's state would would be in `down` and `initialized`, not
in `stable`, and hence we should also allow transition from `initialized`
to `down` directly, right?

2) From the polling thread, besides the `poll` function changes, I think
it's better to also state other blocking function changes like commitSync
as well. I'd assume e.g. for commitSync it would be implemented as:

* Send the commit-request to the server-event channel
* Continue polling from the consumer event channel, but skip other events
like rebalance-listener (we still need to bookkeep it for the next `poll`
call, but we just cannot trigger them since that breaks compatibility)
until we received the commit response event.

Some details about how those functions would be implemented would also be
very helpful for the community's audience.

3) I have a few questions from the rebalance state machine section:

3.a). you mentioned in the state machine:

"Wait for the partition revoked event, and advance the state to
PARTITIONS_REVOKED"
"Wait for partition assignment completion from the polling thread.  Advance
to PARTITIONS_ASSIGNED"

But we do not have those states today, I think you meant to say
"PREPARE_REBALANCING" and "STABLE"?

3.b). Also, it seems that the proposed state transition would be Stable ->
Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
Assigning_Partitions -> Stable (for eager protocol at least), but when auto
commit is enabled, we also need to commit offsets for those revoking
partitions, and the auto-committing happens before the
`onPartitionsRevoked` listener is triggered, so should auto-committing be
part of the `Revoking_Partitions` state as well?

4) Could you expand on the "Exceptions thrown will be different."
description and list all changes to the exceptions (note that, if they do
exist, we also need a KIP). For example, besides WakeupException, Kafka
also have a InterruptException (different from Java's own
InterruptedException) defined on the public APIs, are we going to change
which functions would throw and which will not?

5) In the testing plan:

5.a) Could you elaborate a bit more on "We need to make sure the timing of
the 1.  coordinator discovery and 2.  joinGroup operations are being done
in the correct timing."

5.b) I'd also add that for all the blocking APIs including `poll` where a
timeout value is provided either as param, or implicitly from `
default.api.timeout.ms` they would now be strictly respected --- to me this
is also one of the key motivations of this refactoring :)

-

And the current POC PRs:

1. Just a quick thought about the naming of "KafkaServerEventQueue": I
think I also agree with others that it may be confusing to include
`KafkaServer` on a client class, what about renaming it to
"ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that
it sounds a bit awkward to have the `ResponseEventQueue` to also return
rebalance-listener-triggering events, but that may be less confusing.

2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first
defines an interface in the `internals` package, e.g. `RequestEventHandler`
(assuming the previous rename suggestion), and then have a
`DefaultRequestEventHandler` implementation class which encapsulate the
background thread. This enables us to easily write unit tests that isolate
other modules especially with concurrent threadings.

3. For `KafkaServerEventType`: where would NOOP being used? Also I think
there are other types, like FETCH_COMMITTED as well?



Thanks,
Guozhang


On Tue, Sep 13, 2022 at 2:14 PM Philip Nee  wrote:

> Hi all,
>
> Here is the proposal to refactor the Kafka Consumer
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> >.
> The 1-pager is on the wiki, so please take a look at it.  Also, this

Re: KafkaConsumer refactor proposal

2022-09-15 Thread Kirk True
Hi Philip!

Thanks for the write-up.

On Tue, Sep 13, 2022, at 2:13 PM, Philip Nee wrote:
> Hi all,
> 
> Here is the proposal to refactor the Kafka Consumer
> .
> The 1-pager is on the wiki, so please take a look at it.  Also, this is a
> prerequisite for KIP-848 (the next gen rebalance protocol).

I only have time for a quick read-through, but here are some initial 
questions/comments:

 1. The third bullet point in the "Public-Facing Changes" section says that the 
"exception[s] thrown will be different." Can you provide some more context on 
that? Will this affect user applications that attempt to handle exceptions?
 2. Under "Scope" it mentions that the proposal is to "remove some blocking 
methods, such as commitOffsetSync." Are you referring to the 
Consumer.commitSync() method or something else?
 3. I like how the proposal will address the systemic issues with the current 
consumer 
(https://issues.apache.org/jira/issues/?jql=labels%20%3D%20new-consumer-threading-should-fix).
 Is there a specific set of those Jiras that will be fixed/resolved, or is it 
'best effort'?
 4. "A note on the *subscriptionState*: Its reference will be shared by polling 
and background threads." Sharing this reference implies locking of some sort, 
yes?
 5. Can you elaborate on this sentence: "We need to make sure the timing of the 
1.  coordinator discovery and 2.  joinGroup operations are being done in the 
correct timing."?
 6. Does this new implementation for the consumer internals live alongside the 
current implementation in the code base? How does a user opt-in to the "new" 
implementation?

> 
> Cheers,
> P
> 

Thanks!
Kirk

Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-09-15 Thread Jorge Esteban Quilcate Otoya
Hi everyone,

I've made a slight addition to the KIP based on Yash feedback:

- A new metric is added at INFO level to record the max latency from the
batch timestamp, by keeping the oldest record timestamp per batch.
- A draft implementation is linked.

Looking forward to your feedback.
Also, a kindly reminder that the vote thread is open.

Thanks!
Jorge.

On Thu, 8 Sept 2022 at 14:25, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Great. I have updated the KIP to reflect this.
>
> Cheers,
> Jorge.
>
> On Thu, 8 Sept 2022 at 12:26, Yash Mayya  wrote:
>
>> Thanks, I think it makes sense to define these metrics at a DEBUG
>> recording
>> level.
>>
>> On Thu, Sep 8, 2022 at 2:51 PM Jorge Esteban Quilcate Otoya <
>> quilcate.jo...@gmail.com> wrote:
>>
>> > On Thu, 8 Sept 2022 at 05:55, Yash Mayya  wrote:
>> >
>> > > Hi Jorge,
>> > >
>> > > Thanks for the changes. With regard to having per batch vs per record
>> > > metrics, the additional overhead I was referring to wasn't about
>> whether
>> > or
>> > > not we would need to iterate over all the records in a batch. I was
>> > > referring to the potential additional overhead caused by the higher
>> > volume
>> > > of calls to Sensor::record on the sensors for the new metrics (as
>> > compared
>> > > to the existing batch only metrics), especially for high throughput
>> > > connectors where batch sizes could be large. I guess we may want to do
>> > some
>> > > sort of performance testing and get concrete numbers to verify whether
>> > this
>> > > is a valid concern or not?
>> > >
>> >
>> > 6.1. Got it, thanks for clarifying. I guess there could be a benchmark
>> test
>> > of the `Sensor::record` to get an idea of the performance impact.
>> > Regardless, the fact that these are single-record metrics compared to
>> > existing batch-only could be explicitly defined by setting these
>> metrics at
>> > a DEBUG or TRACE metric recording level, leaving the existing at INFO
>> > level.
>> > wdyt?
>> >
>> >
>> > >
>> > > Thanks,
>> > > Yash
>> > >
>> > > On Tue, Sep 6, 2022 at 4:42 PM Jorge Esteban Quilcate Otoya <
>> > > quilcate.jo...@gmail.com> wrote:
>> > >
>> > > > Hi Sagar and Yash,
>> > > >
>> > > > > the way it's defined in
>> > > > https://kafka.apache.org/documentation/#connect_monitoring for the
>> > > metrics
>> > > >
>> > > > 4.1. Got it. Add it to the KIP.
>> > > >
>> > > > > The only thing I would argue is do we need
>> sink-record-latency-min?
>> > > Maybe
>> > > > we
>> > > > > could remove this min metric as well and make all of the 3 e2e
>> > metrics
>> > > > > consistent
>> > > >
>> > > > 4.2 I see. Will remove it from the KIP.
>> > > >
>> > > > > Probably users can track the metrics at their end to
>> > > > > figure that out. Do you think that makes sense?
>> > > >
>> > > > 4.3. Yes, agree. With these new metrics it should be easier for
>> users
>> > to
>> > > > track this.
>> > > >
>> > > > > I think it makes sense to not have a min metric for either to
>> remain
>> > > > > consistent with the existing put-batch and poll-batch metrics
>> > > >
>> > > > 5.1. Got it. Same as 4.2
>> > > >
>> > > > > Another naming related suggestion I had was with the
>> > > > > "convert-time" metrics - we should probably include
>> transformations
>> > in
>> > > > the
>> > > > > name since SMTs could definitely be attributable to a sizable
>> chunk
>> > of
>> > > > the
>> > > > > latency depending on the specific transformation chain.
>> > > >
>> > > > 5.2. Make sense. I'm proposing to add
>> > `sink-record-convert-transform...`
>> > > > and `source-record-transform-convert...` to represent correctly the
>> > order
>> > > > of operations.
>> > > >
>> > > > > it seems like both source and sink tasks only record metrics at a
>> > > "batch"
>> > > > > level, not on an individual record level. I think it might be
>> > > additional
>> > > > > overhead if we want to record these new metrics all at the record
>> > > level?
>> > > >
>> > > > 5.3. I considered at the beginning to implement all metrics at the
>> > batch
>> > > > level, but given how the framework process records, I fallback to
>> the
>> > > > proposed approach:
>> > > > - Sink Task:
>> > > >   - `WorkerSinkTask#convertMessages(msgs)` already iterates over
>> > records,
>> > > > so there is no additional overhead to capture record latency per
>> > record.
>> > > > -
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514
>> > > >   - `WorkerSinkTask#convertAndTransformRecord(record)` actually
>> happens
>> > > > individually. Measuring this operation per batch would include
>> > processing
>> > > > that is not strictly part of "convert and transform"
>> > > > -
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker

[jira] [Created] (KAFKA-14235) Document MM2 auto topic creation behavior for MM2-internal topics

2022-09-15 Thread Maruthi (Jira)
Maruthi created KAFKA-14235:
---

 Summary: Document MM2 auto topic creation behavior for 
MM2-internal topics
 Key: KAFKA-14235
 URL: https://issues.apache.org/jira/browse/KAFKA-14235
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Maruthi


Currently, there is no way to disable creation of MM2-internal topics even when 
broker+connect configs are set to false

Based on [https://lists.apache.org/thread/px5d1394mzkog7zhp96h80l0knk0n77d] 
this should at least be documented- if not fixed- to avoid confusion 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1229

2022-09-15 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 504948 lines...]
[2022-09-15T16:56:55.687Z] KStreamAggregationIntegrationTest > 
shouldReduce(TestInfo) PASSED
[2022-09-15T16:56:55.687Z] 
[2022-09-15T16:56:55.687Z] KStreamAggregationIntegrationTest > 
shouldAggregate(TestInfo) STARTED
[2022-09-15T16:56:59.691Z] 
[2022-09-15T16:56:59.691Z] KStreamAggregationIntegrationTest > 
shouldAggregate(TestInfo) PASSED
[2022-09-15T16:56:59.691Z] 
[2022-09-15T16:56:59.691Z] KStreamAggregationIntegrationTest > 
shouldCount(TestInfo) STARTED
[2022-09-15T16:57:03.719Z] 
[2022-09-15T16:57:03.719Z] KStreamAggregationIntegrationTest > 
shouldCount(TestInfo) PASSED
[2022-09-15T16:57:03.719Z] 
[2022-09-15T16:57:03.719Z] KStreamAggregationIntegrationTest > 
shouldGroupByKey(TestInfo) STARTED
[2022-09-15T16:57:07.584Z] 
[2022-09-15T16:57:07.584Z] KStreamAggregationIntegrationTest > 
shouldGroupByKey(TestInfo) PASSED
[2022-09-15T16:57:07.584Z] 
[2022-09-15T16:57:07.584Z] KStreamAggregationIntegrationTest > 
shouldCountWithInternalStore(TestInfo) STARTED
[2022-09-15T16:57:12.557Z] 
[2022-09-15T16:57:12.557Z] KStreamAggregationIntegrationTest > 
shouldCountWithInternalStore(TestInfo) PASSED
[2022-09-15T16:57:12.557Z] 
[2022-09-15T16:57:12.557Z] KStreamAggregationIntegrationTest > 
shouldCountUnlimitedWindows() STARTED
[2022-09-15T16:57:13.479Z] 
[2022-09-15T16:57:13.479Z] KStreamAggregationIntegrationTest > 
shouldCountUnlimitedWindows() PASSED
[2022-09-15T16:57:13.479Z] 
[2022-09-15T16:57:13.479Z] KStreamAggregationIntegrationTest > 
shouldReduceWindowed(TestInfo) STARTED
[2022-09-15T16:57:18.078Z] 
[2022-09-15T16:57:18.078Z] KStreamAggregationIntegrationTest > 
shouldReduceWindowed(TestInfo) PASSED
[2022-09-15T16:57:18.078Z] 
[2022-09-15T16:57:18.078Z] KStreamAggregationIntegrationTest > 
shouldCountSessionWindows() STARTED
[2022-09-15T16:57:19.000Z] 
[2022-09-15T16:57:19.000Z] KStreamAggregationIntegrationTest > 
shouldCountSessionWindows() PASSED
[2022-09-15T16:57:19.000Z] 
[2022-09-15T16:57:19.000Z] KStreamAggregationIntegrationTest > 
shouldAggregateWindowed(TestInfo) STARTED
[2022-09-15T16:57:22.993Z] 
[2022-09-15T16:57:22.993Z] KStreamAggregationIntegrationTest > 
shouldAggregateWindowed(TestInfo) PASSED
[2022-09-15T16:57:30.331Z] streams-3: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:30.331Z] streams-2: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:30.331Z] streams-0: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:30.331Z] streams-1: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:30.331Z] streams-4: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:30.331Z] streams-5: SMOKE-TEST-CLIENT-CLOSED
[2022-09-15T16:57:31.759Z] 
[2022-09-15T16:57:31.759Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() PASSED
[2022-09-15T16:57:31.759Z] 
[2022-09-15T16:57:31.759Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest()
 STARTED
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] FAILURE: Build failed with an exception.
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] * What went wrong:
[2022-09-15T16:57:36.058Z] Execution failed for task ':core:unitTest'.
[2022-09-15T16:57:36.058Z] > Process 'Gradle Test Executor 136' finished with 
non-zero exit value 1
[2022-09-15T16:57:36.058Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-15T16:57:36.058Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] * Try:
[2022-09-15T16:57:36.058Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-15T16:57:36.058Z] > Run with --info or --debug option to get more log 
output.
[2022-09-15T16:57:36.058Z] > Run with --scan to get full insights.
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] * Get more help at https://help.gradle.org
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] BUILD FAILED in 2h 47m 27s
[2022-09-15T16:57:36.058Z] 212 actionable tasks: 115 executed, 97 up-to-date
[2022-09-15T16:57:36.058Z] 
[2022-09-15T16:57:36.058Z] See the profiling report at: 
file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk@2/build/reports/profile/profile-2022-09-15-14-10-16.html
[2022-09-15T16:57:36.058Z] A fine-grained performance profile is available: use 
the --scan option.
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // node
[Pipeline] }
[Pipeline] // timestamps
[Pipeline] }
[Pipeline] // timeout
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
Failed in branch JDK 11 and Scala 2.13
[2022-09-15T16:58:21.774Z] 
[2022-09-15T16:58:21.774Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSp

[jira] [Resolved] (KAFKA-13632) MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered records

2022-09-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13632.
---
Resolution: Fixed

> MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered 
> records
> 
>
> Key: KAFKA-13632
> URL: https://issues.apache.org/jira/browse/KAFKA-13632
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0
>Reporter: Bert Baron
>Priority: Minor
>
> We have a setup where we filter records with MirrorMaker 2.0 (see below). 
> This results in the following warning messages as a result of NPE's in 
> MirrorSourceTask.commitRecord for each filtered record:
> {code:java}
> [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure 
> committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829) {code}
> The reason seems to be that for filtered records metadata is null. Note that 
> in the overridden SourceTask.commitRecord the javadoc clearly states that 
> metadata will be null if the record was filtered.
> In our case we use a custom predicate, but the issue can be reproduced with 
> the following configuration:
> {code:java}
> clusters = source,target
> tasks.max = 1
> source.bootstrap.servers = 
> target.bootstrap.servers = 
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> source->target.enabled = true
> source->target.topics = topic1
> source->target.transforms=Filter
> source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
> source->target.transforms.Filter.predicate=HeaderPredicate
> source->target.predicates=HeaderPredicate
> source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
> source->target.predicates.HeaderPredicate.name=someheader
>  {code}
> Each record with the header key 'someheader' will result in the NPE and 
> warning message.
> On a side note, we couldn't find clear documentation on how to configure 
> (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but 
> apart from the NPE's and warning messages this seems to functionally work for 
> us with our custom filter.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13985) MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record

2022-09-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13985.
---
Resolution: Fixed

> MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record
> --
>
> Key: KAFKA-13985
> URL: https://issues.apache.org/jira/browse/KAFKA-13985
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0, 3.2.0, 3.3.0
>Reporter: Jacopo Riciputi
>Assignee: Rens Groothuijsen
>Priority: Minor
> Fix For: 3.4.0
>
>
> Applying a SMT that filters out messages it can brings to enter in this path:
> From WorkerSourceTask.java
> {code:java}
> final SourceRecord record = transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {
>     counter.skipRecord();
>     commitTaskRecord(preTransformRecord, null);
>     continue;
> } {code}
>  
> Then to:
> {code:java}
> private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             task.commitRecord(record, metadata);
>         } catch (Throwable t) {
>             log.error("{} Exception thrown while calling 
> task.commitRecord()", this, t);
>         }
> }{code}
> Finally
> From MirrorSourceTask.java
> {code:java}
>     @Override
>     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             if (stopping) {
>                 return;
>             }
>             if (!metadata.hasOffset()) {
>                 log.error("RecordMetadata has no offset -- can't sync offsets 
> for {}.", record.topic());
>                 return;
>             }
> ...{code}
>  
> Causing a NPE because metadata is null. 
> This the exception.
> {code:java}
> [2022-06-13 12:31:33,094] WARN Failure committing record. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>     at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> In my understanding this is well handled and it does not have negative 
> impacts because it's handled by MirrorSourceTask.commitRecord, without 
> leaving the exception be forwarded outside of it. 
> But probably is preferred to handle it checking if metadata != null.
> So skipping commit but safely and silently
> [EDIT]
> Actually, going a bit in deep, there is a small side-effect.
> If the latest message elaborated was filtered out (so not committed by 
> MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read 
> by consumer, because offset was not committed (and probably filtered out if 
> configurations wasn't change).
> But probably this behavior is fine considering MM2's nature
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)