[jira] [Resolved] (KAFKA-9720) Update gradle to 6.0+

2020-05-17 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9720.

  Assignee: Ismael Juma
Resolution: Fixed

> Update gradle to 6.0+ 
> --
>
> Key: KAFKA-9720
> URL: https://issues.apache.org/jira/browse/KAFKA-9720
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: David Arthur
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
>
> Gradle 6.x has been out for a few months and has a few bug fixes and 
> performance improvements. 
> * https://docs.gradle.org/6.0/release-notes.html
> * https://docs.gradle.org/6.1/release-notes.html
> * https://docs.gradle.org/6.2/release-notes.html
> We should consider update the build to the latest version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Help! Can't add reviewers for Github Kafka PR

2020-05-17 Thread Matthias J. Sax
Well, anybody familiar with the code can be a reviewer!

Of course, a committer needs to merge the PR in the end. But committers
are overloaded with reviews, and if others help reviewing, it reduced
their workload.


-Matthias


On 5/17/20 3:36 PM, John Roesler wrote:
> Ah, I just reread my message. I meant “committer”, of course.
> 
> Thanks,
> John
> 
> On Sun, May 17, 2020, at 16:10, Kowshik Prakasam wrote:
>> Thanks, John!
>>
>>
>> Cheers,
>> Kowshik
>>
>> On Sun, May 17, 2020 at 8:12 AM John Roesler  wrote:
>>> Hi Kowshik,
>>>
>>>  You just have to “@“ mention the username of the person you want in a gh 
>>> comment. I think you have to be a committee to add labels, reviewers, etc. 
>>>
>>>  Hope this helps!
>>>  -John
>>>
>>>  On Sun, May 17, 2020, at 04:11, Kowshik Prakasam wrote:
>>>  > Hi all,
>>>  > 
>>>  > My intent is to create a PR for review in 
>>> https://github.com/apache/kafka .
>>>  > However I find that I'm unable to add reviewers to my PR. Does this need
>>>  > any specific permissions? If so, please could someone grant me access or
>>>  > help me understand what I need to do to get permissions to add reviewers?
>>>  > 
>>>  > 
>>>  > Cheers,
>>>  > Kowshik
>>>  >



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-10012) Reducing memory overhead associated with strings in MetricName

2020-05-17 Thread Navina Ramesh (Jira)
Navina Ramesh created KAFKA-10012:
-

 Summary: Reducing memory overhead associated with strings in 
MetricName
 Key: KAFKA-10012
 URL: https://issues.apache.org/jira/browse/KAFKA-10012
 Project: Kafka
  Issue Type: Improvement
  Components: network
Reporter: Navina Ramesh


{{SelectorMetrics}} has a per-connection metrics, which means the number of 
{{MetricName}} objects and the strings associated with it (such as group name 
and description) grows with the number of connections in the client. This 
overhead of duplicate string objects is amplified when there are multiple 
instances of kafka clients within the same JVM. 

This patch address some of the memory overhead by making {{metricGrpName}} a 
constant and introducing a new constant {{perConnectionMetricGrpName}}. 
Additionally, the strings for metric name and description in {{createMeter}} 
have been interned since there are about 8 per-client and 4 per-connection 
{{Meter}} instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka-site] showuon commented on pull request #265: MINOR: Update stream documentation for version 2.5

2020-05-17 Thread GitBox


showuon commented on pull request #265:
URL: https://github.com/apache/kafka-site/pull/265#issuecomment-629911823


   Same change with the PR to `kafka` repo: 
https://github.com/apache/kafka/pull/8622/  per @bbejeck 's request. Thanks. :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Build failed in Jenkins: kafka-trunk-jdk11 #1471

2020-05-17 Thread Apache Jenkins Server
See 


Changes:

[github] Update Gradle to 6.4.1 (#8678)


--
[...truncated 4.86 MB...]

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionIneligibleToOnlineReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionStartedToNewReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionStartedToNewReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerDetectsBouncedBrokers STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerDetectsBouncedBrokers PASSED

kafka.controller.ControllerIntegrationTest > testControlledShutdown STARTED

kafka.controller.ControllerIntegrationTest > testControlledShutdown PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testMetadataPropagationOnControlPlane STARTED

kafka.controller.ControllerIntegrationTest > 
testMetadataPropagationOnControlPlane PASSED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection STARTED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection PASSED

kafka.controller.ControllerIntegrationTest > testTopicCreation STARTED

kafka.controller.ControllerIntegrationTest > testTopicCreation PASSED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicDeletion 
STARTED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicDeletion 
PASSED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion STARTED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveOnPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveOnPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicCreation 
STARTED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicCreation 
PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerRejectControlledShutdownRequestWithStaleBrokerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerRejectControlledShutdownRequestWithStaleBrokerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testBackToBackPreferredReplicaLeaderElections STARTED


[GitHub] [kafka-site] showuon opened a new pull request #265: Update stream documentation

2020-05-17 Thread GitBox


showuon opened a new pull request #265:
URL: https://github.com/apache/kafka-site/pull/265


   1. fix broken links
   2. rephrase a sentence
   3. update the version number



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Re: [DISCUSS] KIP-602 - Change default value for client.dns.lookup

2020-05-17 Thread Badai Aqrandista
Ismael

What do you think of the PR and the explanation regarding the issue raised
in KIP-235?

Should I go ahead and build a proper PR?

Thanks
Badai

On Mon, May 11, 2020 at 8:53 AM Badai Aqrandista  wrote:

> Ismael
>
> PR created: https://github.com/apache/kafka/pull/8644/files
>
> Also, as this is my first PR, please let me know if I missed anything.
>
> Thanks
> Badai
>
> On Mon, May 11, 2020 at 8:19 AM Badai Aqrandista 
> wrote:
>
>> Ismael
>>
>> Thank you for responding.
>>
>> KIP-235 modified ClientUtils#parseAndValidateAddresses [1] to resolve an
>> address alias (i.e. bootstrap server) into multiple addresses. This is why
>> it would break SSL hostname verification when the bootstrap server is an IP
>> address, i.e. it will resolve the IP address to an FQDN and use that FQDN
>> in the SSL handshake.
>>
>> However, what I am proposing is to modify ClientUtils#resolve [2], which
>> is only used in ClusterConnectionStates#currentAddress [3], to get the
>> resolved InetAddress of the address to connect to. And
>> ClusterConnectionStates#currentAddress is only used by
>> NetworkClient#initiateConnect [4] to create InetSocketAddress to establish
>> the socket connection to the broker.
>>
>> Therefore, as far as I know, this change will not affect higher level
>> protocol like SSL or SASL.
>>
>> PR coming after this.
>>
>> Thanks
>> Badai
>>
>> [1]
>> https://github.com/apache/kafka/blob/2.5.0/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L51
>> [2]
>> https://github.com/apache/kafka/blob/2.5.0/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java#L111
>> [3]
>> https://github.com/apache/kafka/blob/2.5.0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L403
>> [4]
>> https://github.com/apache/kafka/blob/2.5.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L955
>>
>> On Sun, May 10, 2020 at 10:18 AM Ismael Juma  wrote:
>>
>>> Hi Badai,
>>>
>>> I think this is a good change. Can you please address the issues raised
>>> by KIP-235? That was the reason why we did not do it previously.
>>>
>>> Ismael
>>>
>>> On Mon, Apr 27, 2020 at 5:46 PM Badai Aqrandista 
>>> wrote:
>>>
 Hi everyone

 I have opened this KIP to have client.dns.lookup default value changed
 to
 "use_all_dns_ips".


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-602%3A+Change+default+value+for+client.dns.lookup

 Feedback appreciated.

 PS: I'm new here so please let me know if I miss anything.

 --
 Thanks,
 Badai

>>>
>>
>> --
>> Thanks,
>> Badai
>>
>>
>
> --
> Thanks,
> Badai
>
>

-- 
Thanks,
Badai


[VOTE] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hello all,

I'd like to open a vote for KIP-610:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors

Thanks,
Aakash


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
My apologies, had a typo. Meant to say "I will now open up a vote."

Thanks,
Aakash

On Sun, May 17, 2020 at 4:55 PM Aakash Shah  wrote:

> Hi all,
>
> Thanks for all the feedback thus far. I've updated the KIP with all the
> suggestions. I will not open up a vote.
>
> Thanks,
> Aakash
>
> On Sun, May 17, 2020 at 3:45 PM Randall Hauch  wrote:
>
>> All good points regarding `Future` instead of
>> `Future`, so +1 to that change.
>>
>> A few more nits. The following sentences should be removed because they
>> actually describe a change from the current DLQ functionality that already
>> sets `max.in.flight.requests.per.connection=1` by default:
>>
>> "In order to avoid error records being written out of order (for example,
>> due to retries), the developer should always use
>> max.in.flight.requests.per.connection=1 in their implementation for
>> writing
>> error records. If the developer determines that order is not important and
>> they want extreme performance, they can always increase this number."
>>
>> Another is a bit ambiguous, so I suggest changing:
>>
>> "The error reporting functionality is designed to be asynchronous but can
>> be made synchronous if desired. By default, error reporting will be
>> asynchronous; processing of the subsequent errant record will not be
>> blocked by the successful processing of the prior errant record. However,
>> if the developer prefers synchronous functionality, they can block
>> processing of the next record with future.get()."
>>
>> to:
>>
>> "The error reporting functionality is asynchronous. Tasks can use the
>> resulting future to wait for the record and exception to be written to
>> Kafka."
>>
>> Can we please move the example to a new "Example Usage" section that is
>> *after* the "Interface" section? That way the order of the sections is
>> "Method", "Interface", and "Example Usage", and it's more clear how the
>> API
>> is being changed. Also, the first sentence introducing the example is
>> currently:
>>
>> "The usage will look like the following:"
>>
>> but IMO it should actually say it's an example:
>>
>> "The following is an example of how a sink task can use this error
>> reporter
>> and support connectors being deployed in earlier versions of the Connect
>> runtime:"
>>
>> It seems we have pretty good consensus, so I think the KIP is ready for a
>> vote after the above minor corrections are made.
>>
>> Best regards,
>>
>> Randall
>>
>> On Sun, May 17, 2020 at 4:51 PM Arjun Satish 
>> wrote:
>>
>> > Thanks for all the feedback, folks.
>> >
>> > re: having a callback as a parameter, I agree that at this point, it
>> might
>> > not add much value to the proposal.
>> >
>> > re: synchronous vs asynchronous, is the motivation performance/higher
>> > throughput? Taking a step back, calling report(..) in the new interface
>> > does a couple of things:
>> >
>> > 1. at a fundamental level, it is a signal to the framework that a
>> failure
>> > occurred when processing records, specifically due to the given record.
>> > 2. depending on whether errors.log and errors.deadletterqueue has been
>> set,
>> > some messages are written to zero or more destinations.
>> > 3. depending on the value of errors.tolerance (none or all), the task is
>> > failed after reporters have completed.
>> >
>> > for kip-610, the asynchronous method has the advantage of working with
>> the
>> > internal dead letter queue (which has been transparent to the developer
>> so
>> > far). but, how does async method help if the DLQ is not enabled? in this
>> > case RecordMetadata is not very useful, AFAICT? also, if we add more
>> error
>> > reporters in the future (say, for example, a new reporter in a future
>> that
>> > writes to a RDBMS), would the async version return success on all or
>> > nothing, and what about partial successes?
>> >
>> > overall, if we really need async behavior, I'd prefer to just use
>> > Future. but if we can keep it simple, then let's go with a
>> > synchronous function with the parameters Randall proposed above (with
>> > return type as void, and if any of the reporters fail, the task is
>> failed
>> > if error.tolerance is none, and kept alive if tolerance is all), and
>> maybe
>> > add asynchronous methods in a future KIP?
>> >
>> > Best,
>> >
>>
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hi all,

Thanks for all the feedback thus far. I've updated the KIP with all the
suggestions. I will not open up a vote.

Thanks,
Aakash

On Sun, May 17, 2020 at 3:45 PM Randall Hauch  wrote:

> All good points regarding `Future` instead of
> `Future`, so +1 to that change.
>
> A few more nits. The following sentences should be removed because they
> actually describe a change from the current DLQ functionality that already
> sets `max.in.flight.requests.per.connection=1` by default:
>
> "In order to avoid error records being written out of order (for example,
> due to retries), the developer should always use
> max.in.flight.requests.per.connection=1 in their implementation for writing
> error records. If the developer determines that order is not important and
> they want extreme performance, they can always increase this number."
>
> Another is a bit ambiguous, so I suggest changing:
>
> "The error reporting functionality is designed to be asynchronous but can
> be made synchronous if desired. By default, error reporting will be
> asynchronous; processing of the subsequent errant record will not be
> blocked by the successful processing of the prior errant record. However,
> if the developer prefers synchronous functionality, they can block
> processing of the next record with future.get()."
>
> to:
>
> "The error reporting functionality is asynchronous. Tasks can use the
> resulting future to wait for the record and exception to be written to
> Kafka."
>
> Can we please move the example to a new "Example Usage" section that is
> *after* the "Interface" section? That way the order of the sections is
> "Method", "Interface", and "Example Usage", and it's more clear how the API
> is being changed. Also, the first sentence introducing the example is
> currently:
>
> "The usage will look like the following:"
>
> but IMO it should actually say it's an example:
>
> "The following is an example of how a sink task can use this error reporter
> and support connectors being deployed in earlier versions of the Connect
> runtime:"
>
> It seems we have pretty good consensus, so I think the KIP is ready for a
> vote after the above minor corrections are made.
>
> Best regards,
>
> Randall
>
> On Sun, May 17, 2020 at 4:51 PM Arjun Satish 
> wrote:
>
> > Thanks for all the feedback, folks.
> >
> > re: having a callback as a parameter, I agree that at this point, it
> might
> > not add much value to the proposal.
> >
> > re: synchronous vs asynchronous, is the motivation performance/higher
> > throughput? Taking a step back, calling report(..) in the new interface
> > does a couple of things:
> >
> > 1. at a fundamental level, it is a signal to the framework that a failure
> > occurred when processing records, specifically due to the given record.
> > 2. depending on whether errors.log and errors.deadletterqueue has been
> set,
> > some messages are written to zero or more destinations.
> > 3. depending on the value of errors.tolerance (none or all), the task is
> > failed after reporters have completed.
> >
> > for kip-610, the asynchronous method has the advantage of working with
> the
> > internal dead letter queue (which has been transparent to the developer
> so
> > far). but, how does async method help if the DLQ is not enabled? in this
> > case RecordMetadata is not very useful, AFAICT? also, if we add more
> error
> > reporters in the future (say, for example, a new reporter in a future
> that
> > writes to a RDBMS), would the async version return success on all or
> > nothing, and what about partial successes?
> >
> > overall, if we really need async behavior, I'd prefer to just use
> > Future. but if we can keep it simple, then let's go with a
> > synchronous function with the parameters Randall proposed above (with
> > return type as void, and if any of the reporters fail, the task is failed
> > if error.tolerance is none, and kept alive if tolerance is all), and
> maybe
> > add asynchronous methods in a future KIP?
> >
> > Best,
> >
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
All good points regarding `Future` instead of
`Future`, so +1 to that change.

A few more nits. The following sentences should be removed because they
actually describe a change from the current DLQ functionality that already
sets `max.in.flight.requests.per.connection=1` by default:

"In order to avoid error records being written out of order (for example,
due to retries), the developer should always use
max.in.flight.requests.per.connection=1 in their implementation for writing
error records. If the developer determines that order is not important and
they want extreme performance, they can always increase this number."

Another is a bit ambiguous, so I suggest changing:

"The error reporting functionality is designed to be asynchronous but can
be made synchronous if desired. By default, error reporting will be
asynchronous; processing of the subsequent errant record will not be
blocked by the successful processing of the prior errant record. However,
if the developer prefers synchronous functionality, they can block
processing of the next record with future.get()."

to:

"The error reporting functionality is asynchronous. Tasks can use the
resulting future to wait for the record and exception to be written to
Kafka."

Can we please move the example to a new "Example Usage" section that is
*after* the "Interface" section? That way the order of the sections is
"Method", "Interface", and "Example Usage", and it's more clear how the API
is being changed. Also, the first sentence introducing the example is
currently:

"The usage will look like the following:"

but IMO it should actually say it's an example:

"The following is an example of how a sink task can use this error reporter
and support connectors being deployed in earlier versions of the Connect
runtime:"

It seems we have pretty good consensus, so I think the KIP is ready for a
vote after the above minor corrections are made.

Best regards,

Randall

On Sun, May 17, 2020 at 4:51 PM Arjun Satish  wrote:

> Thanks for all the feedback, folks.
>
> re: having a callback as a parameter, I agree that at this point, it might
> not add much value to the proposal.
>
> re: synchronous vs asynchronous, is the motivation performance/higher
> throughput? Taking a step back, calling report(..) in the new interface
> does a couple of things:
>
> 1. at a fundamental level, it is a signal to the framework that a failure
> occurred when processing records, specifically due to the given record.
> 2. depending on whether errors.log and errors.deadletterqueue has been set,
> some messages are written to zero or more destinations.
> 3. depending on the value of errors.tolerance (none or all), the task is
> failed after reporters have completed.
>
> for kip-610, the asynchronous method has the advantage of working with the
> internal dead letter queue (which has been transparent to the developer so
> far). but, how does async method help if the DLQ is not enabled? in this
> case RecordMetadata is not very useful, AFAICT? also, if we add more error
> reporters in the future (say, for example, a new reporter in a future that
> writes to a RDBMS), would the async version return success on all or
> nothing, and what about partial successes?
>
> overall, if we really need async behavior, I'd prefer to just use
> Future. but if we can keep it simple, then let's go with a
> synchronous function with the parameters Randall proposed above (with
> return type as void, and if any of the reporters fail, the task is failed
> if error.tolerance is none, and kept alive if tolerance is all), and maybe
> add asynchronous methods in a future KIP?
>
> Best,
>


Re: Help! Can't add reviewers for Github Kafka PR

2020-05-17 Thread John Roesler
Ah, I just reread my message. I meant “committer”, of course.

Thanks,
John

On Sun, May 17, 2020, at 16:10, Kowshik Prakasam wrote:
> Thanks, John!
> 
> 
> Cheers,
> Kowshik
> 
> On Sun, May 17, 2020 at 8:12 AM John Roesler  wrote:
> > Hi Kowshik,
> > 
> >  You just have to “@“ mention the username of the person you want in a gh 
> > comment. I think you have to be a committee to add labels, reviewers, etc. 
> > 
> >  Hope this helps!
> >  -John
> > 
> >  On Sun, May 17, 2020, at 04:11, Kowshik Prakasam wrote:
> >  > Hi all,
> >  > 
> >  > My intent is to create a PR for review in 
> > https://github.com/apache/kafka .
> >  > However I find that I'm unable to add reviewers to my PR. Does this need
> >  > any specific permissions? If so, please could someone grant me access or
> >  > help me understand what I need to do to get permissions to add reviewers?
> >  > 
> >  > 
> >  > Cheers,
> >  > Kowshik
> >  >


Re: [DISCUSS] KIP-553: Enable TLSv1.3 by default and disable all protocols except [TLSV1.2, TLSV1.3]

2020-05-17 Thread Ismael Juma
Hi Nikolay,

Quick question, the following is meant to include TLSv1.3 as well, right?

Change the value of the SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS to
> "TLSv1.2"


In addition, two more questions:

1. `ssl.protocol` would remain TLSv1.2 with this change. It would be good
to explain why that's OK.
2. What is the behavior for people who have configured `ssl.cipher.suites`?
The cipher suite names are different in TLS 1.3. What would be the behavior
if the client requests TLS 1.3, but the server only has cipher suites for
TLS 1.2? It would be good to explain the expected behavior and add tests to
verify it.

Ismael

On Thu, Apr 30, 2020 at 9:47 AM Nikolay Izhikov  wrote:

> Ticket created:
>
> https://issues.apache.org/jira/browse/KAFKA-9943
>
> I will prepare the PR, shortly.
>
> > 27 апр. 2020 г., в 17:55, Ismael Juma  написал(а):
> >
> > Yes, a PR would be great.
> >
> > Ismael
> >
> > On Mon, Apr 27, 2020, 2:10 AM Nikolay Izhikov 
> wrote:
> >
> >> Hello, Ismael.
> >>
> >> AFAIK we don’t run tests with the TLSv1.3, by default.
> >> Are you suggesting to do it?
> >> I can create a PR for it.
> >>
> >>> 24 апр. 2020 г., в 17:34, Ismael Juma  написал(а):
> >>>
> >>> Right, some companies run them nightly. What I meant to ask is if we
> >>> changed the configuration so that TLS 1.3 is exercised in the system
> >> tests
> >>> by default.
> >>>
> >>> Ismael
> >>>
> >>> On Fri, Apr 24, 2020 at 7:32 AM Nikolay Izhikov 
> >> wrote:
> >>>
>  Hello, Ismael.
> 
>  AFAIK we don’t run system tests nightly.
>  Do we have resources to run system tests periodically?
> 
>  When I did the testing I used servers my employer gave me.
> 
> > 24 апр. 2020 г., в 08:05, Ismael Juma 
> написал(а):
> >
> > Hi Nikolay,
> >
> > Seems like we have been able to run the system tests with TLS 1.3. Do
> >> we
> > run them nightly?
> >
> > Ismael
> >
> > On Fri, Feb 14, 2020 at 4:17 AM Nikolay Izhikov  >
>  wrote:
> >
> >> Hello, Kafka team.
> >>
> >> I ran system tests that use SSL for the TLSv1.3.
> >> You can find the results of the tests in the Jira ticket [1], [2],
> >> [3],
> >> [4].
> >>
> >> I also, need a changes [5] in `security_config.py` to execute system
>  tests
> >> with TLSv1.3(more info in PR description).
> >> Please, take a look.
> >>
> >> Test environment:
> >>  • openjdk11
> >>  • trunk + changes from my PR [5].
> >>
> >> Full system tests results have volume 15gb.
> >> Should I share full logs with you?
> >>
> >> What else should be done before we can enable TLSv1.3 by default?
> >>
> >> [1]
> >>
> 
> >>
> https://issues.apache.org/jira/browse/KAFKA-9319?focusedCommentId=17036927=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036927
> >>
> >> [2]
> >>
> 
> >>
> https://issues.apache.org/jira/browse/KAFKA-9319?focusedCommentId=17036928=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036928
> >>
> >> [3]
> >>
> 
> >>
> https://issues.apache.org/jira/browse/KAFKA-9319?focusedCommentId=17036929=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036929
> >>
> >> [4]
> >>
> 
> >>
> https://issues.apache.org/jira/browse/KAFKA-9319?focusedCommentId=17036930=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036930
> >>
> >> [5]
> >>
> 
> >>
> https://github.com/apache/kafka/pull/8106/files#diff-6dd015b94706f6920d9de524c355ddd8R51
> >>
> >>> 29 янв. 2020 г., в 15:27, Nikolay Izhikov 
> >> написал(а):
> >>>
> >>> Hello, Rajini.
> >>>
> >>> Thanks for the feedback.
> >>>
> >>> I’ve searched tests by the «ssl» keyword and found the following
> >> tests:
> >>>
> >>> ./test/kafkatest/services/kafka_log4j_appender.py
> >>> ./test/kafkatest/services/listener_security_config.py
> >>> ./test/kafkatest/services/security/security_config.py
> >>> ./test/kafkatest/tests/core/security_test.py
> >>>
> >>> Is this all tests that need to be run with the TLSv1.3 to ensure we
> >> can
> >> enable it by default?
> >>>
>  28 янв. 2020 г., в 14:58, Rajini Sivaram  >
> >> написал(а):
> 
>  Hi Nikolay,
> 
>  Not sure of the total space required. But you can run a collection
> >> of
> >> tests at a time instead of running them all together. That way, you
>  could
> >> just run all the tests that enable SSL. Details of running a subset
> of
> >> tests are in the README in tests.
> 
>  On Mon, Jan 27, 2020 at 6:29 PM Nikolay Izhikov <
> >> nizhi...@apache.org>
> >> wrote:
>  Hello, Rajini.
> 
>  I’m tried to run all system tests but failed for now.
>  It happens, that system tests generates a lot of logs.
>  I had a 250GB 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Arjun Satish
Thanks for all the feedback, folks.

re: having a callback as a parameter, I agree that at this point, it might
not add much value to the proposal.

re: synchronous vs asynchronous, is the motivation performance/higher
throughput? Taking a step back, calling report(..) in the new interface
does a couple of things:

1. at a fundamental level, it is a signal to the framework that a failure
occurred when processing records, specifically due to the given record.
2. depending on whether errors.log and errors.deadletterqueue has been set,
some messages are written to zero or more destinations.
3. depending on the value of errors.tolerance (none or all), the task is
failed after reporters have completed.

for kip-610, the asynchronous method has the advantage of working with the
internal dead letter queue (which has been transparent to the developer so
far). but, how does async method help if the DLQ is not enabled? in this
case RecordMetadata is not very useful, AFAICT? also, if we add more error
reporters in the future (say, for example, a new reporter in a future that
writes to a RDBMS), would the async version return success on all or
nothing, and what about partial successes?

overall, if we really need async behavior, I'd prefer to just use
Future. but if we can keep it simple, then let's go with a
synchronous function with the parameters Randall proposed above (with
return type as void, and if any of the reporters fail, the task is failed
if error.tolerance is none, and kept alive if tolerance is all), and maybe
add asynchronous methods in a future KIP?

Best,


Re: Help! Can't add reviewers for Github Kafka PR

2020-05-17 Thread Kowshik Prakasam
Thanks, John!


Cheers,
Kowshik

On Sun, May 17, 2020 at 8:12 AM John Roesler  wrote:

> Hi Kowshik,
>
> You just have to “@“ mention the username of the person you want in a gh
> comment. I think you have to be a committee to add labels, reviewers, etc.
>
> Hope this helps!
> -John
>
> On Sun, May 17, 2020, at 04:11, Kowshik Prakasam wrote:
> > Hi all,
> >
> > My intent is to create a PR for review in
> https://github.com/apache/kafka .
> > However I find that I'm unable to add reviewers to my PR. Does this need
> > any specific permissions? If so, please could someone grant me access or
> > help me understand what I need to do to get permissions to add reviewers?
> >
> >
> > Cheers,
> > Kowshik
> >
>


Build failed in Jenkins: kafka-trunk-jdk14 #97

2020-05-17 Thread Apache Jenkins Server
See 


Changes:

[github] Update Gradle to 6.4.1 (#8678)


--
[...truncated 3.09 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = 

Build failed in Jenkins: kafka-trunk-jdk8 #4538

2020-05-17 Thread Apache Jenkins Server
See 


Changes:

[github] Update Gradle to 6.4.1 (#8678)


--
[...truncated 3.07 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:testClasses
> Task :streams:upgrade-system-tests-0100:checkstyleTest
> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-system-tests-0101:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:testClasses
> Task :streams:upgrade-system-tests-0101:checkstyleTest
> Task :streams:upgrade-system-tests-0101:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:test
> Task :streams:upgrade-system-tests-0102:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0102:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0102:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0102:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0102:compileTestJava
> Task :streams:upgrade-system-tests-0102:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0102:testClasses
> Task 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Chris Egerton
Hi Konstantine,

Given that the reporter interface is intentionally agnostic about how
records are handled and does not necessarily entail writes to a DLQ, I'm
also in favor of not specifying a return type for the reporting mechanism.

I'm still unclear on how futures are going to provide any benefit to
developers, though. Blocking on the return of such a future slightly later
on in the process of handling records is still blocking, and to be done
truly asynchronously without blocking processing of non-errant records,
would have to be done on a separate thread. It's technically possible for
users to cache all of these futures and instead of invoking "get" on them,
simply check whether they're complete or not via "isDone", but this seems
like an anti-pattern.

What is the benefit of wrapping this in a future?

With regards to Randall's comments about a callback, I'd like to make some
observations:

>  1. The `Callable` interface requires the sink task developer to handle
>   an error being passed to the callback, but in this case it's very
unlikely
>   the Connect runtime will ever call the callback with an error and will
>   instead handle it (e.g., retry forever, or fail the task, etc.). IOW,
the
>   `Callback` interface is appropriate for `Producer`, but it's far more
broad
>   than is needed here.

Since we're already introducing a new interface with the current proposal,
it seems like we might be able to implement our own interface for this
callback as well. One possibility is basically just a Consumer
that is given the errant sink record if/when that record has been
successfully reported by the framework. If the plan is still to fail the
task immediately if the framework fails to report an errant record provided
to it by a task, I don't think we need to have a mechanism to report such a
failure to the task by, e.g., throwing an exception, and can simply request
that the task shut down and stop giving it records.

>   2. Without the callback, the Connect runtime will not run task code
>   within the DLQ producer thread, and this is very safe. But when a
callback
>   is provided, that callback *will* be called on the DQL producer's
thread --
>   and any mistakes in the sink task's callback may block the DLQ. IOW,
having
>   a callback is too risky.

With the current DLQ implementation, each task is given its own producer,
which is unlikely to change soon given that per-connector producer
overrides are now possible thanks to KIP-458 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy).
So the fallout of any errors in this callback would be limited solely to
the sink task that provided the callback. There's some precedent for this
with how the framework invokes "SourceTask::commitRecord" directly in the
producer callback for source tasks.

>   3. We actually don't know that a callback is even necessary.

There's a decent use case for a callback here where a task uses it for
tracking offsets to report in "SinkTask::preCommit" without blocking on the
result of an errant record report. This might even be simpler than a
future-based approach, depending on how we anticipate developers would use
that approach.

>   4. Just having the one `report(SinkTask, Throwable)` is simpler and,
>   given the looming deadline, much closer to what we've already discussed.

Agreed that, no matter what, one asynchronous method is enough. One async
and one sync method might be reasonable if we really can't settle on a good
one-size-fits-all API but hopefully it won't come to that.

Cheers,

Chris

On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I'm on board with adding an interface in the Connect API as Arjun
> suggested. Slightly higher commitment and maintenance but it also gives us
> an easier path to future extensions in this scope (error handling). The
> usage is equivalent to adding just a new method with known types to
> `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
> in the connector code, but in both suggestions this would fail with
> `NoSuchMethodError` on older workers).
>
> With respect to the method signature, I also agree with Randall's latest
> suggestion, of a two argument method such as:
>
> Future report(SinkTask, Throwable)
>
> Returning `Future` can also be ok, but since this refers to
> the DLQ I'd slightly prefer to avoid exposing information that might
> confuse the users regarding what topic, partitions and offset this return
> value corresponds to. But both return types should be fine and will give
> plenty of flexibility to connector developers, making the sync use case
> straightforward. In any case, given the interface we can extend this in a
> compatible way in the future if we think we need to.
>
> Minor comments:
> Version will be 2.6 and not 2.9 (the latter was added by accident in a few
> places).
>
> Best,
> Konstantine
>
>
> On Sun, May 17, 2020 at 11:25 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hi all,

I've updated the KIP to reflect all the new agreed-upon suggestions.

Please let me know if you have any more suggestions.

Thanks,
Aakash

On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I'm on board with adding an interface in the Connect API as Arjun
> suggested. Slightly higher commitment and maintenance but it also gives us
> an easier path to future extensions in this scope (error handling). The
> usage is equivalent to adding just a new method with known types to
> `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
> in the connector code, but in both suggestions this would fail with
> `NoSuchMethodError` on older workers).
>
> With respect to the method signature, I also agree with Randall's latest
> suggestion, of a two argument method such as:
>
> Future report(SinkTask, Throwable)
>
> Returning `Future` can also be ok, but since this refers to
> the DLQ I'd slightly prefer to avoid exposing information that might
> confuse the users regarding what topic, partitions and offset this return
> value corresponds to. But both return types should be fine and will give
> plenty of flexibility to connector developers, making the sync use case
> straightforward. In any case, given the interface we can extend this in a
> compatible way in the future if we think we need to.
>
> Minor comments:
> Version will be 2.6 and not 2.9 (the latter was added by accident in a few
> places).
>
> Best,
> Konstantine
>
>
> On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > If that's the case, I think framework should not commit if there are any
> > outstanding records in teh reporter. That would prevent the scenario
> where
> > we could potentially lose records frm being sent either to Sink/the
> > reporter. WDYT about the KIP including that as part of the design?
> >
> > On Sun, May 17, 2020 at 11:13 AM Randall Hauch  wrote:
> >
> > > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Randall
> > > >
> > > > Thanks a lot for your thoughts. I was wondering if we would ever have
> > to
> > > > make the API asynchronous, we could expose it as a new method right?
> If
> > > > that's a possibility would it be better if the API explicitly has
> > > semantics
> > > > of a synchronous API if the implementation is indeed going to be
> > > > synchronous.
> > > >
> > >
> > > Thanks, Magesh.
> > >
> > > I think it's likely that the implementation may need to be synchronous
> to
> > > some degree. For example, just to keep the implementation simple we
> might
> > > block the WorkerSinkTask after `put(Collection)` returns we
> > > might latch until the reporter has received all acks, especially if it
> > > simplifies the offset management and commit logic.
> > >
> > > Even if that's the case, having each `report(...)` call be asynchronous
> > > means that the sink task doesn't *have* to wait until each failed
> record
> > > has been recorded to continue sending valid records to the external
> > system.
> > > Consider an example with 1000 records in a batch, where only the first
> > > record has an error. If `record(...)` were synchronous, the `put(...)`
> > > method would block reporting the first record and would then only send
> > the
> > > 999 after that's happened. With an asynchronous `record(...)` method,
> the
> > > `put(...)` method could report the first record, send the 999 records,
> > and
> > > then wait for the futures returned by the report method.
> > >
> > >
> > > >
> > > > On Sun, May 17, 2020, 9:27 AM Randall Hauch 
> wrote:
> > > >
> > > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > > mage...@confluent.io> wrote:
> > > > >
> > > > > > Thanks Randall. The suggestion i made also has a problem when
> > > reporter
> > > > > > isn't enabled where it could potentially write records after
> error
> > > > > records
> > > > > > to sink before failing.
> > > > > >
> > > > > > The other concern i had with reporter being asynchronous. For
> some
> > > > reason
> > > > > > if the reporter is taking longer because of say a specific broker
> > > > issue,
> > > > > > the connector might still move forward and commit if it's not
> > waiting
> > > > for
> > > > > > the reporter.  During  this if the worker crashes we will now
> lose
> > > the
> > > > > bad
> > > > > > record
> > > > > >  I don't think this is desirable behavior. I think the
> synchronous
> > > > > reporter
> > > > > > provides better guarantees for all connectors.
> > > > > >
> > > > > >
> > > > > Thanks, Magesh.
> > > > >
> > > > > That's a valid concern, and maybe that will affect how the feature
> is
> > > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > > errant
> > > > > records are fully written to Kafka before the offsets are
> committed,
> > so
> > > > it
> > > > > might be simplest to start out with a synchronous 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Konstantine Karantasis
Hi all,

I'm on board with adding an interface in the Connect API as Arjun
suggested. Slightly higher commitment and maintenance but it also gives us
an easier path to future extensions in this scope (error handling). The
usage is equivalent to adding just a new method with known types to
`SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
in the connector code, but in both suggestions this would fail with
`NoSuchMethodError` on older workers).

With respect to the method signature, I also agree with Randall's latest
suggestion, of a two argument method such as:

Future report(SinkTask, Throwable)

Returning `Future` can also be ok, but since this refers to
the DLQ I'd slightly prefer to avoid exposing information that might
confuse the users regarding what topic, partitions and offset this return
value corresponds to. But both return types should be fine and will give
plenty of flexibility to connector developers, making the sync use case
straightforward. In any case, given the interface we can extend this in a
compatible way in the future if we think we need to.

Minor comments:
Version will be 2.6 and not 2.9 (the latter was added by accident in a few
places).

Best,
Konstantine


On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> If that's the case, I think framework should not commit if there are any
> outstanding records in teh reporter. That would prevent the scenario where
> we could potentially lose records frm being sent either to Sink/the
> reporter. WDYT about the KIP including that as part of the design?
>
> On Sun, May 17, 2020 at 11:13 AM Randall Hauch  wrote:
>
> > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > mage...@confluent.io> wrote:
> >
> > > Randall
> > >
> > > Thanks a lot for your thoughts. I was wondering if we would ever have
> to
> > > make the API asynchronous, we could expose it as a new method right? If
> > > that's a possibility would it be better if the API explicitly has
> > semantics
> > > of a synchronous API if the implementation is indeed going to be
> > > synchronous.
> > >
> >
> > Thanks, Magesh.
> >
> > I think it's likely that the implementation may need to be synchronous to
> > some degree. For example, just to keep the implementation simple we might
> > block the WorkerSinkTask after `put(Collection)` returns we
> > might latch until the reporter has received all acks, especially if it
> > simplifies the offset management and commit logic.
> >
> > Even if that's the case, having each `report(...)` call be asynchronous
> > means that the sink task doesn't *have* to wait until each failed record
> > has been recorded to continue sending valid records to the external
> system.
> > Consider an example with 1000 records in a batch, where only the first
> > record has an error. If `record(...)` were synchronous, the `put(...)`
> > method would block reporting the first record and would then only send
> the
> > 999 after that's happened. With an asynchronous `record(...)` method, the
> > `put(...)` method could report the first record, send the 999 records,
> and
> > then wait for the futures returned by the report method.
> >
> >
> > >
> > > On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:
> > >
> > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > mage...@confluent.io> wrote:
> > > >
> > > > > Thanks Randall. The suggestion i made also has a problem when
> > reporter
> > > > > isn't enabled where it could potentially write records after error
> > > > records
> > > > > to sink before failing.
> > > > >
> > > > > The other concern i had with reporter being asynchronous. For some
> > > reason
> > > > > if the reporter is taking longer because of say a specific broker
> > > issue,
> > > > > the connector might still move forward and commit if it's not
> waiting
> > > for
> > > > > the reporter.  During  this if the worker crashes we will now lose
> > the
> > > > bad
> > > > > record
> > > > >  I don't think this is desirable behavior. I think the synchronous
> > > > reporter
> > > > > provides better guarantees for all connectors.
> > > > >
> > > > >
> > > > Thanks, Magesh.
> > > >
> > > > That's a valid concern, and maybe that will affect how the feature is
> > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > errant
> > > > records are fully written to Kafka before the offsets are committed,
> so
> > > it
> > > > might be simplest to start out with a synchronous implementation. But
> > the
> > > > API can still be an asynchronous design whether or not the
> > implementation
> > > > is synchronous. That gives us the ability in the future to change the
> > > > implementation if we determine a way to handle all concerns. For
> > example,
> > > > the WorkerSinkTask may need to backoff if waiting to commit due to
> too
> > > many
> > > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > > `report`
> > > > method(s) 

[jira] [Created] (KAFKA-10011) lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll

2020-05-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10011:
---

 Summary: lockedTaskDirectories should be cleared when task gets 
closed dirty in HandleLostAll
 Key: KAFKA-10011
 URL: https://issues.apache.org/jira/browse/KAFKA-10011
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


Tasks who get closed in handleLostAll don't clear out their position inside 
lockedTaskDirectories, which causes an illegal state afterwards:
{code:java}
[2020-05-17T06:21:54-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) [2020-05-17 
13:21:54,127] ERROR 
[stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
stream-thread 
[stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-17T06:21:54-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) 
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/3_1] doesn't exist and couldn't be 
created
        at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
        at 
org.apache.kafka.streams.processor.internals.StateDirectory.checkpointFileFor(StateDirectory.java:121)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.getTaskOffsetSums(TaskManager.java:498)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:239)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:560)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:495)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:417)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-609: Use Pre-registration and Blocking Calls for Better Transaction Efficiency

2020-05-17 Thread Guozhang Wang
My point here is only for the first AddPartitionToTxn request of the
transaction, since only that request would potentially be blocked on the
previous txn to complete. By deferring it we reduce the blocking time.

I think StreamsConfigs override the linger.ms to 100ms not 10ms, so in the
best case we can defer the first AddPartitionToTxn of the transaction by
100ms.

Guozhang


On Sat, May 16, 2020 at 12:20 PM Boyang Chen 
wrote:

> Thanks Guozhang for the context. The producer batch is either bounded by
> the size or the linger time. For the default 10ms linger and 100ms
> transaction commit time, the producer will be capped by
> AddPartitionToTxn 10 times in the worst case. I think the improvement here
> aims for the worst case scenario for users who didn't realize how the
> internal works, and uses the API calls in a very inefficient way as the
> scenario where record processing and send() happen concurrently.
>
> Boyang
>
> On Sat, May 16, 2020 at 10:19 AM Guozhang Wang  wrote:
>
> > Hello Boyang,
> >
> > Thanks for the proposed KIP, overall it makes sense to me.
> >
> > One non-public API related point that I'd like to make though, is that in
> > KafkaProducer.send call we can potentially defer sending
> > AddPartitionsToTxn request until the sender is about to send the first
> > batch -- this is what I observed from some soak testing investigation
> such
> > that the batching effects actually allows the first record to be sent
> much
> > later than the send() call and that can be leveraged to further reduce
> the
> > time that we would be blocked on the AddPartitionsToTxn request.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 14, 2020 at 10:26 PM Boyang Chen  >
> > wrote:
> >
> > > Hey all,
> > >
> > > I would like to start the discussion for KIP-609:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency
> > >
> > > This KIP aims to improve the current EOS semantic which makes the
> > > processing more efficient and consolidated.
> > >
> > > Thanks!
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
If that's the case, I think framework should not commit if there are any
outstanding records in teh reporter. That would prevent the scenario where
we could potentially lose records frm being sent either to Sink/the
reporter. WDYT about the KIP including that as part of the design?

On Sun, May 17, 2020 at 11:13 AM Randall Hauch  wrote:

> On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Randall
> >
> > Thanks a lot for your thoughts. I was wondering if we would ever have to
> > make the API asynchronous, we could expose it as a new method right? If
> > that's a possibility would it be better if the API explicitly has
> semantics
> > of a synchronous API if the implementation is indeed going to be
> > synchronous.
> >
>
> Thanks, Magesh.
>
> I think it's likely that the implementation may need to be synchronous to
> some degree. For example, just to keep the implementation simple we might
> block the WorkerSinkTask after `put(Collection)` returns we
> might latch until the reporter has received all acks, especially if it
> simplifies the offset management and commit logic.
>
> Even if that's the case, having each `report(...)` call be asynchronous
> means that the sink task doesn't *have* to wait until each failed record
> has been recorded to continue sending valid records to the external system.
> Consider an example with 1000 records in a batch, where only the first
> record has an error. If `record(...)` were synchronous, the `put(...)`
> method would block reporting the first record and would then only send the
> 999 after that's happened. With an asynchronous `record(...)` method, the
> `put(...)` method could report the first record, send the 999 records, and
> then wait for the futures returned by the report method.
>
>
> >
> > On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:
> >
> > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Thanks Randall. The suggestion i made also has a problem when
> reporter
> > > > isn't enabled where it could potentially write records after error
> > > records
> > > > to sink before failing.
> > > >
> > > > The other concern i had with reporter being asynchronous. For some
> > reason
> > > > if the reporter is taking longer because of say a specific broker
> > issue,
> > > > the connector might still move forward and commit if it's not waiting
> > for
> > > > the reporter.  During  this if the worker crashes we will now lose
> the
> > > bad
> > > > record
> > > >  I don't think this is desirable behavior. I think the synchronous
> > > reporter
> > > > provides better guarantees for all connectors.
> > > >
> > > >
> > > Thanks, Magesh.
> > >
> > > That's a valid concern, and maybe that will affect how the feature is
> > > actually implemented. I expect it to be a bit tricky to ensure that
> > errant
> > > records are fully written to Kafka before the offsets are committed, so
> > it
> > > might be simplest to start out with a synchronous implementation. But
> the
> > > API can still be an asynchronous design whether or not the
> implementation
> > > is synchronous. That gives us the ability in the future to change the
> > > implementation if we determine a way to handle all concerns. For
> example,
> > > the WorkerSinkTask may need to backoff if waiting to commit due to too
> > many
> > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > `report`
> > > method(s) synchronous from the beginning, it will be very challenging
> to
> > > change them in the future to be asynchronous.
> > >
> > > I guess it boils down to this question: do we know today that we will
> > > *never* want the reporter to write asynchronously?
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> >
>


-- 
Thanks
Magesh

*Magesh Nandakumar*
Software Engineer
mage...@confluent.io


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> Randall
>
> Thanks a lot for your thoughts. I was wondering if we would ever have to
> make the API asynchronous, we could expose it as a new method right? If
> that's a possibility would it be better if the API explicitly has semantics
> of a synchronous API if the implementation is indeed going to be
> synchronous.
>

Thanks, Magesh.

I think it's likely that the implementation may need to be synchronous to
some degree. For example, just to keep the implementation simple we might
block the WorkerSinkTask after `put(Collection)` returns we
might latch until the reporter has received all acks, especially if it
simplifies the offset management and commit logic.

Even if that's the case, having each `report(...)` call be asynchronous
means that the sink task doesn't *have* to wait until each failed record
has been recorded to continue sending valid records to the external system.
Consider an example with 1000 records in a batch, where only the first
record has an error. If `record(...)` were synchronous, the `put(...)`
method would block reporting the first record and would then only send the
999 after that's happened. With an asynchronous `record(...)` method, the
`put(...)` method could report the first record, send the 999 records, and
then wait for the futures returned by the report method.


>
> On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:
>
> > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > mage...@confluent.io> wrote:
> >
> > > Thanks Randall. The suggestion i made also has a problem when reporter
> > > isn't enabled where it could potentially write records after error
> > records
> > > to sink before failing.
> > >
> > > The other concern i had with reporter being asynchronous. For some
> reason
> > > if the reporter is taking longer because of say a specific broker
> issue,
> > > the connector might still move forward and commit if it's not waiting
> for
> > > the reporter.  During  this if the worker crashes we will now lose the
> > bad
> > > record
> > >  I don't think this is desirable behavior. I think the synchronous
> > reporter
> > > provides better guarantees for all connectors.
> > >
> > >
> > Thanks, Magesh.
> >
> > That's a valid concern, and maybe that will affect how the feature is
> > actually implemented. I expect it to be a bit tricky to ensure that
> errant
> > records are fully written to Kafka before the offsets are committed, so
> it
> > might be simplest to start out with a synchronous implementation. But the
> > API can still be an asynchronous design whether or not the implementation
> > is synchronous. That gives us the ability in the future to change the
> > implementation if we determine a way to handle all concerns. For example,
> > the WorkerSinkTask may need to backoff if waiting to commit due to too
> many
> > incomplete/unacknowledged reporter requests. OTOH, if we make the
> `report`
> > method(s) synchronous from the beginning, it will be very challenging to
> > change them in the future to be asynchronous.
> >
> > I guess it boils down to this question: do we know today that we will
> > *never* want the reporter to write asynchronously?
> >
> > Best regards,
> >
> > Randall
> >
>


[jira] [Created] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10010:
---

 Summary: Should close standby task for safety during HandleLostAll
 Key: KAFKA-10010
 URL: https://issues.apache.org/jira/browse/KAFKA-10010
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


The current lost all logic doesn't close standby task, which could potentially 
lead to a tricky condition like below:



1. The standby task was initializing as `CREATED` state, and task corrupted 
exception was thrown from registerStateStores

2. The task corrupted exception was caught, and do a non-affected task commit

3. The task commit failed due to task migrated exception

4. The handleLostAll didn't close the standby task, leaving it as CREATED state

5. Next rebalance complete, the same task was assigned back as standby task.

6. Illegal Argument exception caught :
{code:java}
[2020-05-16T11:56:18-07:00] 
(streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
18:56:18,050] ERROR 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
stream-thread 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-16T11:56:18-07:00] 
(streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
java.lang.IllegalArgumentException: stream-thread 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
been registered.
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
        at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
        at 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
        at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
Thanks, Aakash.

After thinking about my previous proposal, I would like to retract the
suggestion of adding the `report(SinkTask, Throwable, Callable)` method
from the new interface for the following reasons:

   1. The `Callable` interface requires the sink task developer to handle
   an error being passed to the callback, but in this case it's very unlikely
   the Connect runtime will ever call the callback with an error and will
   instead handle it (e.g., retry forever, or fail the task, etc.). IOW, the
   `Callback` interface is appropriate for `Producer`, but it's far more broad
   than is needed here.
   2. Without the callback, the Connect runtime will not run task code
   within the DLQ producer thread, and this is very safe. But when a callback
   is provided, that callback *will* be called on the DQL producer's thread --
   and any mistakes in the sink task's callback may block the DLQ. IOW, having
   a callback is too risky.
   3. We actually don't know that a callback is even necessary.
   4. Just having the one `report(SinkTask, Throwable)` is simpler and,
   given the looming deadline, much closer to what we've already discussed.

My previous proposal also used "NoSuchClassError" in the JavaDoc and
example, but that was a typo. Per Arjun's message, the actual exception
class should be "NoClassDefFoundError".

I made a few others suggestions yesterday about some other parts of the
KIP, and hope those will be considered as well.

Best regards,

Randall

On Sun, May 17, 2020 at 12:34 PM Aakash Shah  wrote:

> Hi Randall,
>
> Thanks for the suggestions. Now that we are adding an interface, I think it
> is a good idea to overload the report method to support both cases.
>
> > I guess it boils down to this question: do we know today that we will
> > *never* want the reporter to write asynchronously?
>
> Originally, I believe the idea was to initially implement a synchronous
> design in order to ensure the guarantees mentioned by Magesh, and then
> introduce a new method in a subsequent KIP for an asynchronous error
> reporter if there were requests for it. But I agree with what you're
> saying, if we can design it asynchronously, then it opens up the
> possibilities to implement it in multiple ways in the future.
>
> Thanks,
> Aakash
>
> On Sun, May 17, 2020 at 9:04 AM Randall Hauch  wrote:
>
> > Thanks, Arjun! This has been very helpful.
> >
> > Looking in your POC and thinking in terms of the current KIP, it sounds
> > like the suggestion is to keep the same method signature for reporting
> > errors, but to change from the `BiFunction > Future` to a new `ErrantRecordReporter` interface. More concretely,
> > I'd suggest the KIP propose adding the following new interface:
> >
> > /**
> >  * Component that the sink task can use as it {@link SinkTask#put(/**
> >  * Reporter of problematic records and the corresponding problems.
> >  *
> >  * @since 2.9
> >  */
> > public interface ErrantRecordReporter {
> >
> > /**
> >  * Report a problematic record and the corresponding error to be
> > written to the sink
> >  * connector's dead letter queue (DLQ).
> >  *
> >  * This call is asynchronous and returns a {@link
> > java.util.concurrent.Future Future} for the
> >  * {@link RecordMetadata} that will be assigned to the record in the
> > DLQ topic. Invoking
> >  * {@link java.util.concurrent.Future#get() get()} on this future
> will
> > block until the
> >  * record has been written and then return the metadata for the
> record
> >  * or throw any exception that occurred while sending the record.
> >  * If you want to simulate a simple blocking call you can call the
> > get() method
> >  * immediately.
> >  *
> >  * @param record the problematic record; may not be null
> >  * @param error  the error capturing the problem with the record; may
> > not be null
> >  * @return a future that can be used to block until the record and
> > error are written
> >  * to the DLQ
> >  * @since 2.9
> >  */
> > default Future report(SinkRecord record, Throwable
> > error) {
> > return report(record, error, null);
> > }
> >
> > /**
> >  * Report a problematic record and the corresponding error to be
> > written to the sink
> >  * connector's dead letter queue (DLQ).
> >  *
> >  * This call is asynchronous and returns a {@link
> > java.util.concurrent.Future Future} for the
> >  * {@link RecordMetadata} that will be assigned to the record in the
> > DLQ topic. Invoking
> >  * {@link java.util.concurrent.Future#get() get()} on this future
> will
> > block until the
> >  * record has been written and then return the metadata for the
> record
> >  * or throw any exception that occurred while sending the record.
> >  * If you want to simulate a simple blocking call you can call the
> > get() method
> >  * immediately.
> >  *
> >  * Fully non-blocking usage can make use of the 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Randall

Thanks a lot for your thoughts. I was wondering if we would ever have to
make the API asynchronous, we could expose it as a new method right? If
that's a possibility would it be better if the API explicitly has semantics
of a synchronous API if the implementation is indeed going to be
synchronous.

On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:

> On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Thanks Randall. The suggestion i made also has a problem when reporter
> > isn't enabled where it could potentially write records after error
> records
> > to sink before failing.
> >
> > The other concern i had with reporter being asynchronous. For some reason
> > if the reporter is taking longer because of say a specific broker issue,
> > the connector might still move forward and commit if it's not waiting for
> > the reporter.  During  this if the worker crashes we will now lose the
> bad
> > record
> >  I don't think this is desirable behavior. I think the synchronous
> reporter
> > provides better guarantees for all connectors.
> >
> >
> Thanks, Magesh.
>
> That's a valid concern, and maybe that will affect how the feature is
> actually implemented. I expect it to be a bit tricky to ensure that errant
> records are fully written to Kafka before the offsets are committed, so it
> might be simplest to start out with a synchronous implementation. But the
> API can still be an asynchronous design whether or not the implementation
> is synchronous. That gives us the ability in the future to change the
> implementation if we determine a way to handle all concerns. For example,
> the WorkerSinkTask may need to backoff if waiting to commit due to too many
> incomplete/unacknowledged reporter requests. OTOH, if we make the `report`
> method(s) synchronous from the beginning, it will be very challenging to
> change them in the future to be asynchronous.
>
> I guess it boils down to this question: do we know today that we will
> *never* want the reporter to write asynchronously?
>
> Best regards,
>
> Randall
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hi Randall,

Thanks for the suggestions. Now that we are adding an interface, I think it
is a good idea to overload the report method to support both cases.

> I guess it boils down to this question: do we know today that we will
> *never* want the reporter to write asynchronously?

Originally, I believe the idea was to initially implement a synchronous
design in order to ensure the guarantees mentioned by Magesh, and then
introduce a new method in a subsequent KIP for an asynchronous error
reporter if there were requests for it. But I agree with what you're
saying, if we can design it asynchronously, then it opens up the
possibilities to implement it in multiple ways in the future.

Thanks,
Aakash

On Sun, May 17, 2020 at 9:04 AM Randall Hauch  wrote:

> Thanks, Arjun! This has been very helpful.
>
> Looking in your POC and thinking in terms of the current KIP, it sounds
> like the suggestion is to keep the same method signature for reporting
> errors, but to change from the `BiFunction Future` to a new `ErrantRecordReporter` interface. More concretely,
> I'd suggest the KIP propose adding the following new interface:
>
> /**
>  * Component that the sink task can use as it {@link SinkTask#put(/**
>  * Reporter of problematic records and the corresponding problems.
>  *
>  * @since 2.9
>  */
> public interface ErrantRecordReporter {
>
> /**
>  * Report a problematic record and the corresponding error to be
> written to the sink
>  * connector's dead letter queue (DLQ).
>  *
>  * This call is asynchronous and returns a {@link
> java.util.concurrent.Future Future} for the
>  * {@link RecordMetadata} that will be assigned to the record in the
> DLQ topic. Invoking
>  * {@link java.util.concurrent.Future#get() get()} on this future will
> block until the
>  * record has been written and then return the metadata for the record
>  * or throw any exception that occurred while sending the record.
>  * If you want to simulate a simple blocking call you can call the
> get() method
>  * immediately.
>  *
>  * @param record the problematic record; may not be null
>  * @param error  the error capturing the problem with the record; may
> not be null
>  * @return a future that can be used to block until the record and
> error are written
>  * to the DLQ
>  * @since 2.9
>  */
> default Future report(SinkRecord record, Throwable
> error) {
> return report(record, error, null);
> }
>
> /**
>  * Report a problematic record and the corresponding error to be
> written to the sink
>  * connector's dead letter queue (DLQ).
>  *
>  * This call is asynchronous and returns a {@link
> java.util.concurrent.Future Future} for the
>  * {@link RecordMetadata} that will be assigned to the record in the
> DLQ topic. Invoking
>  * {@link java.util.concurrent.Future#get() get()} on this future will
> block until the
>  * record has been written and then return the metadata for the record
>  * or throw any exception that occurred while sending the record.
>  * If you want to simulate a simple blocking call you can call the
> get() method
>  * immediately.
>  *
>  * Fully non-blocking usage can make use of the {@link Callback}
> parameter to provide a
>  * callback that will be invoked when the request is complete.
> Callbacks for records being
>  * sent to the same partition are guaranteed to execute in order.
>  *
>  * @param record   the problematic record; may not be null
>  * @param errorthe error capturing the problem with the record; may
> not be null
>  * @param callback A user-supplied callback to execute when the record
> has been acknowledged
>  * by the server; may be null for no callback
>  * @return a future that can be used to block until the record and
> error are written
>  * to the DLQ
>  * @since 2.9
>  */
> Future report(SinkRecord record, Throwable error,
> Callback callback);
> }
>
> and then modify the proposed changed to the SinkTaskContext to be:
>
> public interface SinkTaskContext {
> ...
> /**
>  * Get the reporter to which the sink task can report problematic
> {@link SinkRecord} objects
>  * passed to the {@link SinkTask#put(Collection)} method.
>  *
>  * This method was added in Apache Kafka 2.9. Sink tasks that use
> this method and want to
>  * maintain backward compatibility so they can also be installed in
> older Connect runtimes
>  * should guard its use with a try-catch block, since calling this
> method will result in a
>  * {@link NoSuchClassError} or {@link NoSuchMethodError} when the sink
> connector is
>  * used in Connect runtimes older than Kafka 2.9. For example:
>  * 
>  * ErrantRecordReporter reporter;
>  * try {
>  * reporter = context.failedRecordReporter();
>  * } catch (NoSuchClassError | 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> Thanks Randall. The suggestion i made also has a problem when reporter
> isn't enabled where it could potentially write records after error records
> to sink before failing.
>
> The other concern i had with reporter being asynchronous. For some reason
> if the reporter is taking longer because of say a specific broker issue,
> the connector might still move forward and commit if it's not waiting for
> the reporter.  During  this if the worker crashes we will now lose the bad
> record
>  I don't think this is desirable behavior. I think the synchronous reporter
> provides better guarantees for all connectors.
>
>
Thanks, Magesh.

That's a valid concern, and maybe that will affect how the feature is
actually implemented. I expect it to be a bit tricky to ensure that errant
records are fully written to Kafka before the offsets are committed, so it
might be simplest to start out with a synchronous implementation. But the
API can still be an asynchronous design whether or not the implementation
is synchronous. That gives us the ability in the future to change the
implementation if we determine a way to handle all concerns. For example,
the WorkerSinkTask may need to backoff if waiting to commit due to too many
incomplete/unacknowledged reporter requests. OTOH, if we make the `report`
method(s) synchronous from the beginning, it will be very challenging to
change them in the future to be asynchronous.

I guess it boils down to this question: do we know today that we will
*never* want the reporter to write asynchronously?

Best regards,

Randall


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
Thanks, Arjun! This has been very helpful.

Looking in your POC and thinking in terms of the current KIP, it sounds
like the suggestion is to keep the same method signature for reporting
errors, but to change from the `BiFunction` to a new `ErrantRecordReporter` interface. More concretely,
I'd suggest the KIP propose adding the following new interface:

/**
 * Component that the sink task can use as it {@link SinkTask#put(/**
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.9
 */
public interface ErrantRecordReporter {

/**
 * Report a problematic record and the corresponding error to be
written to the sink
 * connector's dead letter queue (DLQ).
 *
 * This call is asynchronous and returns a {@link
java.util.concurrent.Future Future} for the
 * {@link RecordMetadata} that will be assigned to the record in the
DLQ topic. Invoking
 * {@link java.util.concurrent.Future#get() get()} on this future will
block until the
 * record has been written and then return the metadata for the record
 * or throw any exception that occurred while sending the record.
 * If you want to simulate a simple blocking call you can call the
get() method
 * immediately.
 *
 * @param record the problematic record; may not be null
 * @param error  the error capturing the problem with the record; may
not be null
 * @return a future that can be used to block until the record and
error are written
 * to the DLQ
 * @since 2.9
 */
default Future report(SinkRecord record, Throwable
error) {
return report(record, error, null);
}

/**
 * Report a problematic record and the corresponding error to be
written to the sink
 * connector's dead letter queue (DLQ).
 *
 * This call is asynchronous and returns a {@link
java.util.concurrent.Future Future} for the
 * {@link RecordMetadata} that will be assigned to the record in the
DLQ topic. Invoking
 * {@link java.util.concurrent.Future#get() get()} on this future will
block until the
 * record has been written and then return the metadata for the record
 * or throw any exception that occurred while sending the record.
 * If you want to simulate a simple blocking call you can call the
get() method
 * immediately.
 *
 * Fully non-blocking usage can make use of the {@link Callback}
parameter to provide a
 * callback that will be invoked when the request is complete.
Callbacks for records being
 * sent to the same partition are guaranteed to execute in order.
 *
 * @param record   the problematic record; may not be null
 * @param errorthe error capturing the problem with the record; may
not be null
 * @param callback A user-supplied callback to execute when the record
has been acknowledged
 * by the server; may be null for no callback
 * @return a future that can be used to block until the record and
error are written
 * to the DLQ
 * @since 2.9
 */
Future report(SinkRecord record, Throwable error,
Callback callback);
}

and then modify the proposed changed to the SinkTaskContext to be:

public interface SinkTaskContext {
...
/**
 * Get the reporter to which the sink task can report problematic
{@link SinkRecord} objects
 * passed to the {@link SinkTask#put(Collection)} method.
 *
 * This method was added in Apache Kafka 2.9. Sink tasks that use
this method and want to
 * maintain backward compatibility so they can also be installed in
older Connect runtimes
 * should guard its use with a try-catch block, since calling this
method will result in a
 * {@link NoSuchClassError} or {@link NoSuchMethodError} when the sink
connector is
 * used in Connect runtimes older than Kafka 2.9. For example:
 * 
 * ErrantRecordReporter reporter;
 * try {
 * reporter = context.failedRecordReporter();
 * } catch (NoSuchClassError | NoSuchMethodError e) {
 * reporter = null;
 * }
 * 
 *
 * @return the reporter function; null if no error reporter has been
configured for the connector
 * @since 2.9
 */
ErrantRecordReporter failedRecordReporter();
}

The example usage then becomes:

private ErrantRecordReporter reporter;

@Override
public void start(Map props) {
  ...
  try {
reporter = context.failedRecordReporter(); // may be null if DLQ not
enabled
  } catch (NoSuchClassError | NoSuchMethodError e) {
// Will occur in Connect runtimes earlier than 2.9
reporter = null;
  }
}

@Override
public void put(Collection records) {
  for (SinkRecord record: records) {
try {
  // attempt to send record to data sink
  process(record);
} catch(Exception e) {
  if (reporter != null) {
// Send errant record to error reporter
reporter.accept(record, e, (metadata, error) -> {
// do something
));
  

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Thanks Randall. The suggestion i made also has a problem when reporter
isn't enabled where it could potentially write records after error records
to sink before failing.

The other concern i had with reporter being asynchronous. For some reason
if the reporter is taking longer because of say a specific broker issue,
the connector might still move forward and commit if it's not waiting for
the reporter.  During  this if the worker crashes we will now lose the bad
record
 I don't think this is desirable behavior. I think the synchronous reporter
provides better guarantees for all connectors.

On Sun, May 17, 2020, 8:02 AM Randall Hauch  wrote:

> Magesh, we have talked above overloading various existing SinkTask methods,
> and we concluded that this style of evolution complicates migration,
> whereas providing the reporter via the context follows existing patterns in
> the API and simplifies backward compatibility concerns. Arjun's research
> shows that we can even introduce a new interface type and sink connector
> developers can very easily accommodate their sink connector implementations
> running in newer and older versions of the Connect runtime.
>
> Best regards,
>
> Randall
>
> On Sun, May 17, 2020 at 3:26 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Have we considered returning error records by overriding flush/precommit?
> > If we think aesthetics is important this on my opinion is one possible
> > abstractions that could be cleaner. This would also mean that connector
> > developers wouldn't have to worry about a new reporter or think if its
> > synchronous or not synchronous. If error records are available and thedlq
> > isn't enabled framework can possibly fail the task. Alternatively, we
> could
> > also have an overloaded put return error records or even introduce a new
> > errorRecords that gets invoked after put.
> >
> > On Sat, May 16, 2020, 2:37 PM Aakash Shah  wrote:
> >
> > > Hi Randall,
> > >
> > > Thanks for the suggestion. I've updated the KIP with the agreed upon
> > > changes as well as the new suggestions Randall mentioned:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > >
> > >
> > > Please let me know what you think.
> > >
> > > Thanks,
> > > Aakash
> > >
> > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch 
> wrote:
> > >
> > > > Thanks again for the active discussion!
> > > >
> > > > Regarding the future-vs-callback discussion: I did like where Chris
> was
> > > > going with the Callback, but he raises good point that it's unclear
> > what
> > > to
> > > > use for the reporter type, since we'd need three parameters.
> > Introducing
> > > a
> > > > new interface makes it much harder for a sink task to be backward
> > > > compatible, so sticking with BiFunction is a good compromise. Plus,
> > > another
> > > > significant disadvantage of a callback approach is that a sink task's
> > > > callback is called from the producer thread, and this risks a
> > > > poorly written sink task callback killing the reporter's producer
> > without
> > > > necessarily failing the task. Using a future avoids this risk
> > altogether,
> > > > still provides the sink task with the ability to do synchronous
> > reporting
> > > > using Future, which is a standard and conventional design pattern. So
> > we
> > > do
> > > > seem to have converged on using `BiFunction > > > Future>` for the reporter type.
> > > >
> > > > Now, we still seem to not have converted upon how to pass the
> reporter
> > to
> > > > the sink task. I agree with Konstantine that the deprecation affects
> > only
> > > > newer versions of Connect, and that a sink task should deal with both
> > put
> > > > methods only when it wants to support older runtimes. I also think
> that
> > > > this is a viable approach, but I do concede that this evolution of
> the
> > > sink
> > > > task API is more complicated than it should be.
> > > >
> > > > In the interest of quickly coming to consensus on how we pass the
> > > reporter
> > > > to the sink task, I'd like to go back to Andrew's original
> suggestion,
> > > > which I think we disregarded too quickly: add a getter on the
> > > > SinkTaskContext interface. We already have precedent for adding
> methods
> > > to
> > > > one of the context classes with the newly-adopted KIP-131, which
> adds a
> > > > getter for the OffsetStorageReader on the (new)
> SourceConnectorContext.
> > > > That KIP accepts the fact that a source connector wanting to use this
> > > > feature while also keeping the ability to be installed into older
> > Connect
> > > > runtimes must guard its use of the context's getter method.
> > > >
> > > > I think we can use the same pattern for this KIP, and add a getter to
> > the
> > > > existing SinkTaskContext that is defined something like:
> > > >
> > > > public interface SinkTaskContext {
> > > > ...
> > > > /**
> > > >  * Get the reporter to which the sink task can 

Re: Help! Can't add reviewers for Github Kafka PR

2020-05-17 Thread John Roesler
Hi Kowshik,

You just have to “@“ mention the username of the person you want in a gh 
comment. I think you have to be a committee to add labels, reviewers, etc. 

Hope this helps!
-John

On Sun, May 17, 2020, at 04:11, Kowshik Prakasam wrote:
> Hi all,
> 
> My intent is to create a PR for review in https://github.com/apache/kafka .
> However I find that I'm unable to add reviewers to my PR. Does this need
> any specific permissions? If so, please could someone grant me access or
> help me understand what I need to do to get permissions to add reviewers?
> 
> 
> Cheers,
> Kowshik
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Randall Hauch
Magesh, we have talked above overloading various existing SinkTask methods,
and we concluded that this style of evolution complicates migration,
whereas providing the reporter via the context follows existing patterns in
the API and simplifies backward compatibility concerns. Arjun's research
shows that we can even introduce a new interface type and sink connector
developers can very easily accommodate their sink connector implementations
running in newer and older versions of the Connect runtime.

Best regards,

Randall

On Sun, May 17, 2020 at 3:26 AM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> Have we considered returning error records by overriding flush/precommit?
> If we think aesthetics is important this on my opinion is one possible
> abstractions that could be cleaner. This would also mean that connector
> developers wouldn't have to worry about a new reporter or think if its
> synchronous or not synchronous. If error records are available and thedlq
> isn't enabled framework can possibly fail the task. Alternatively, we could
> also have an overloaded put return error records or even introduce a new
> errorRecords that gets invoked after put.
>
> On Sat, May 16, 2020, 2:37 PM Aakash Shah  wrote:
>
> > Hi Randall,
> >
> > Thanks for the suggestion. I've updated the KIP with the agreed upon
> > changes as well as the new suggestions Randall mentioned:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> >
> >
> > Please let me know what you think.
> >
> > Thanks,
> > Aakash
> >
> > On Sat, May 16, 2020 at 12:34 PM Randall Hauch  wrote:
> >
> > > Thanks again for the active discussion!
> > >
> > > Regarding the future-vs-callback discussion: I did like where Chris was
> > > going with the Callback, but he raises good point that it's unclear
> what
> > to
> > > use for the reporter type, since we'd need three parameters.
> Introducing
> > a
> > > new interface makes it much harder for a sink task to be backward
> > > compatible, so sticking with BiFunction is a good compromise. Plus,
> > another
> > > significant disadvantage of a callback approach is that a sink task's
> > > callback is called from the producer thread, and this risks a
> > > poorly written sink task callback killing the reporter's producer
> without
> > > necessarily failing the task. Using a future avoids this risk
> altogether,
> > > still provides the sink task with the ability to do synchronous
> reporting
> > > using Future, which is a standard and conventional design pattern. So
> we
> > do
> > > seem to have converged on using `BiFunction > > Future>` for the reporter type.
> > >
> > > Now, we still seem to not have converted upon how to pass the reporter
> to
> > > the sink task. I agree with Konstantine that the deprecation affects
> only
> > > newer versions of Connect, and that a sink task should deal with both
> put
> > > methods only when it wants to support older runtimes. I also think that
> > > this is a viable approach, but I do concede that this evolution of the
> > sink
> > > task API is more complicated than it should be.
> > >
> > > In the interest of quickly coming to consensus on how we pass the
> > reporter
> > > to the sink task, I'd like to go back to Andrew's original suggestion,
> > > which I think we disregarded too quickly: add a getter on the
> > > SinkTaskContext interface. We already have precedent for adding methods
> > to
> > > one of the context classes with the newly-adopted KIP-131, which adds a
> > > getter for the OffsetStorageReader on the (new) SourceConnectorContext.
> > > That KIP accepts the fact that a source connector wanting to use this
> > > feature while also keeping the ability to be installed into older
> Connect
> > > runtimes must guard its use of the context's getter method.
> > >
> > > I think we can use the same pattern for this KIP, and add a getter to
> the
> > > existing SinkTaskContext that is defined something like:
> > >
> > > public interface SinkTaskContext {
> > > ...
> > > /**
> > >  * Get the reporter to which the sink task can report problematic
> or
> > > failed {@link SinkRecord}
> > >  * passed to the {@link SinkTask#put(Collection)} method. When
> > > reporting a failed record,
> > >  * the sink task will receive a {@link Future} that the task can
> > > optionally use to wait until
> > >  * the failed record and exception have been written to Kafka via
> > > Connect's DLQ. Note that
> > >  * the result of this method may be null if this connector has not
> > been
> > > configured with a DLQ.
> > >  *
> > >  * This method was added in Apache Kafka 2.9. Sink tasks that
> use
> > > this method but want to
> > >  * maintain backward compatibility so they can also be deployed to
> > > older Connect runtimes
> > >  * should guard the call to this method with a try-catch block,
> since
> > > calling this method will result in a
> > >  * {@link 

Help! Can't add reviewers for Github Kafka PR

2020-05-17 Thread Kowshik Prakasam
Hi all,

My intent is to create a PR for review in https://github.com/apache/kafka .
However I find that I'm unable to add reviewers to my PR. Does this need
any specific permissions? If so, please could someone grant me access or
help me understand what I need to do to get permissions to add reviewers?


Cheers,
Kowshik


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-05-17 Thread Alexandre Dupriez
Hi Satish,

Thank you for your updates.

I have some questions around potential use cases when unclean leader
election is enabled.

It is possible that a range of offsets of a segment which is already
offloaded to a tier storage is included in the range of offsets to be
truncated. A follower, which is far behind its leader and is uncleanly
becoming the topic-partition leader replica, may even require new
followers to discard entire segments which were previously offloaded.

It happens seldomly in practice, yet it is a valid case which is
handled by the log truncation semantics (as described in KIP-279), and
is required to prevent divergence of log lineage between replicas
after clean and unclean election of new leaders.

The log truncation algorithm is currently implemented for local
(non-tiered) log segments. How should remote segments, which would be
truncated if they were local, be processed?

Applying the same type of transformation to tiered segments (that is,
deletion and truncation of files) would generate additional traffic
make leader migration more resource intensive, time consuming, and
introduce significant delays which are not incurred with local file
systems operations.

Another approach could be to design remote segment metadata such that
they can handle truncation use cases. They would need to support
segments which are part of a discarded log lineage, and support
segment truncation.

In the case of local segments, all modifications to a log is protected
by a log-wide lock. The same determinism and associated semantics
needs to be preserved for remote segments, which makes update of
metadata an integral part of log modifications. We need to think about
how such updates are propagated to other brokers such that replication
semantics are not violated. For instance, when a replica is being
truncated, it should not be possible for consumers to access the
replica under truncation from another broker via their common storage
tier.

This assumes the same guarantees on log lineage apply to remote
segments as they do for local segments, including under unclean leader
election.

I think these guarantees are too fundamental to tolerate a diverging
behaviour depending on whether a segment is “local” or “remote”.

What do you think?

Thanks,
Alexandre

Le jeu. 14 mai 2020 à 18:38, Satish Duggana  a écrit :
>
> Hi Jun,
> Thanks for your comments.  We updated the KIP with more details.
>
> >100. For each of the operations related to tiering, it would be useful to
> provide a description on how it works with the new API. These include
> things like consumer fetch, replica fetch, offsetForTimestamp, retention
> (remote and local) by size, time and logStartOffset, topic deletion, etc.
> This will tell us if the proposed APIs are sufficient.
>
> We addressed most of these APIs in the KIP. We can add more details if
> needed.
>
> >101. For the default implementation based on internal topic, is it meant
> as a proof of concept or for production usage? I assume that it's the
> former. However, if it's the latter, then the KIP needs to describe the
> design in more detail.
>
> It is production usage as was mentioned in an earlier mail. We plan to
> update this section in the next few days.
>
> >102. When tiering a segment, the segment is first written to the object
> store and then its metadata is written to RLMM using the api "void
> putRemoteLogSegmentData()".
> One potential issue with this approach is that if the system fails after
> the first operation, it leaves a garbage in the object store that's never
> reclaimed. One way to improve this is to have two separate APIs, sth like
> preparePutRemoteLogSegmentData() and commitPutRemoteLogSegmentData().
>
> That is a good point. We currently have a different way using markers in
> the segment but your suggestion is much better.
>
> >103. It seems that the transactional support and the ability to read from
> follower are missing.
>
> KIP is updated with transactional support, follower fetch semantics, and
> reading from a follower.
>
> >104. It would be useful to provide a testing plan for this KIP.
>
> We added a few tests by introducing test util for tiered storage in the PR.
> We will provide the testing plan in the next few days.
>
> Thanks,
> Satish.
>
>
> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani  wrote:
>
> >
> >
> >
> >
> > On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao  wrote:
> >
> >> Hi, Satish,
> >>
> >> Thanks for the updated doc. The new API seems to be an improvement
> >> overall. A few more comments below.
> >>
> >> 100. For each of the operations related to tiering, it would be useful to
> >> provide a description on how it works with the new API. These include
> >> things like consumer fetch, replica fetch, offsetForTimestamp, retention
> >> (remote and local) by size, time and logStartOffset, topic deletion, etc.
> >> This will tell us if the proposed APIs are sufficient.
> >>
> >
> > Thanks for the feedback Jun. We will add more details 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Have we considered returning error records by overriding flush/precommit?
If we think aesthetics is important this on my opinion is one possible
abstractions that could be cleaner. This would also mean that connector
developers wouldn't have to worry about a new reporter or think if its
synchronous or not synchronous. If error records are available and thedlq
isn't enabled framework can possibly fail the task. Alternatively, we could
also have an overloaded put return error records or even introduce a new
errorRecords that gets invoked after put.

On Sat, May 16, 2020, 2:37 PM Aakash Shah  wrote:

> Hi Randall,
>
> Thanks for the suggestion. I've updated the KIP with the agreed upon
> changes as well as the new suggestions Randall mentioned:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
>
>
> Please let me know what you think.
>
> Thanks,
> Aakash
>
> On Sat, May 16, 2020 at 12:34 PM Randall Hauch  wrote:
>
> > Thanks again for the active discussion!
> >
> > Regarding the future-vs-callback discussion: I did like where Chris was
> > going with the Callback, but he raises good point that it's unclear what
> to
> > use for the reporter type, since we'd need three parameters. Introducing
> a
> > new interface makes it much harder for a sink task to be backward
> > compatible, so sticking with BiFunction is a good compromise. Plus,
> another
> > significant disadvantage of a callback approach is that a sink task's
> > callback is called from the producer thread, and this risks a
> > poorly written sink task callback killing the reporter's producer without
> > necessarily failing the task. Using a future avoids this risk altogether,
> > still provides the sink task with the ability to do synchronous reporting
> > using Future, which is a standard and conventional design pattern. So we
> do
> > seem to have converged on using `BiFunction > Future>` for the reporter type.
> >
> > Now, we still seem to not have converted upon how to pass the reporter to
> > the sink task. I agree with Konstantine that the deprecation affects only
> > newer versions of Connect, and that a sink task should deal with both put
> > methods only when it wants to support older runtimes. I also think that
> > this is a viable approach, but I do concede that this evolution of the
> sink
> > task API is more complicated than it should be.
> >
> > In the interest of quickly coming to consensus on how we pass the
> reporter
> > to the sink task, I'd like to go back to Andrew's original suggestion,
> > which I think we disregarded too quickly: add a getter on the
> > SinkTaskContext interface. We already have precedent for adding methods
> to
> > one of the context classes with the newly-adopted KIP-131, which adds a
> > getter for the OffsetStorageReader on the (new) SourceConnectorContext.
> > That KIP accepts the fact that a source connector wanting to use this
> > feature while also keeping the ability to be installed into older Connect
> > runtimes must guard its use of the context's getter method.
> >
> > I think we can use the same pattern for this KIP, and add a getter to the
> > existing SinkTaskContext that is defined something like:
> >
> > public interface SinkTaskContext {
> > ...
> > /**
> >  * Get the reporter to which the sink task can report problematic or
> > failed {@link SinkRecord}
> >  * passed to the {@link SinkTask#put(Collection)} method. When
> > reporting a failed record,
> >  * the sink task will receive a {@link Future} that the task can
> > optionally use to wait until
> >  * the failed record and exception have been written to Kafka via
> > Connect's DLQ. Note that
> >  * the result of this method may be null if this connector has not
> been
> > configured with a DLQ.
> >  *
> >  * This method was added in Apache Kafka 2.9. Sink tasks that use
> > this method but want to
> >  * maintain backward compatibility so they can also be deployed to
> > older Connect runtimes
> >  * should guard the call to this method with a try-catch block, since
> > calling this method will result in a
> >  * {@link NoSuchMethodException} when the sink connector is deployed
> to
> > Connect runtimes
> >  * older than Kafka 2.9. For example:
> >  * 
> >  * BiFunctionSinkTask, Throwable, FutureVoid
> > reporter;
> >  * try {
> >  * reporter = context.failedRecordReporter();
> >  * } catch (NoSuchMethodException e) {
> >  * reporter = null;
> >  * }
> >  * 
> >  *
> >  * @return the reporter function; null if no error reporter has been
> > configured for the connector
> >  * @since 2.9
> >  */
> > BiFunction> failedRecordReporter();
> > }
> >
> > The main advantage is that the KIP no longer has to make *any other*
> > changes to the Sink Connector or Task API. The above is really the only
> > change, and it's merely an addition to the API. No