Re: Errors thrown from a KStream transformer are swallowed, eg. StackOverflowError

2021-11-24 Thread Luke Chen
Yes, I confirmed that in KAFKA-9331 (
https://github.com/apache/kafka/pull/9487), we removed the `catch
(exception)` block for adding uncaught exception handler.
Then, in KAFKA-12537(https://github.com/apache/kafka/pull/10387), we added
`catch (Thrwoable)` (in v2.8.0).

So, @Sinclair, if you upgrade to v2.8.0 or later, this issue should be
fixed.

Thank you.
Luke





On Sat, Nov 20, 2021 at 8:49 AM Matthias J. Sax  wrote:

> Not sure what version you are using, but it say `Thrwoable` in `trunk`
>
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L577
>
>
> -Matthias
>
> On 11/18/21 6:09 AM, John Roesler wrote:
> > Thanks for pointing that out, Scott!
> >
> > You’re totally right; that should be a Throwable.
> >
> > Just to put it out there, do you want to just send a quick PR? If not,
> no worries. I’m just asking because it seems like you’ve already done the
> hard part and it might be nice to get the contribution credit.
> >
> > Thanks,
> > John
> >
> > On Thu, Nov 18, 2021, at 08:00, Sinclair Scott wrote:
> >> Hi there,
> >>
> >>
> >> I'm a big fan of KStreams - thanks for all the great work!!
> >>
> >>
> >> I unfortunately (my fault) had a StackOverflowError bug in my KStream
> >> transformer which meant that the KStream died without reporting any
> >> Exception at all.
> >>
> >>
> >> The first log message showed some polling activity and then you see
> >> later the State transition to PENDING_SHUTDOWN
> >>
> >>
> >> Main Consumer poll completed in 2 ms and fetched 1 records
> >> Flushing all global globalStores registered in the state manager
> >> Idempotently invoking restoration logic in state RUNNING
> >> Finished restoring all changelogs []
> >> Idempotent restore call done. Thread state has not changed.
> >> Processing tasks with 1 iterations.
> >> Flushing all global globalStores registered in the state manager
> >> State transition from RUNNING to PENDING_SHUTDOWN
> >>
> >>
> >>
> >> This is because the StreamThread.run() method catches Exception only.
> >>
> >>
> >> I ended up recompiling the kstreams and changing the catch to Throwable
> >> so I can see what was going on. Then I discovered my bad recursive call
> >>   :(
> >>
> >>
> >> Can we please change the Catch to catch Throwable , so that we are
> >> always guaranteed some output?
> >>
> >>
> >> StreamThread.java
> >>
> >> @Override
> >> public void run() {
> >>  log.info("Starting");
> >>  if (setState(State.STARTING) == null) {
> >>  log.info("StreamThread already shutdown. Not running");
> >>  return;
> >>  }
> >>  boolean cleanRun = false;
> >>  try {
> >>  runLoop();
> >>  cleanRun = true;
> >>  } catch (final Exception e) {
> >>  // we have caught all Kafka related exceptions, and other
> >> runtime exceptions
> >>  // should be due to user application errors
> >>
> >>  if (e instanceof UnsupportedVersionException) {
> >>  final String errorMessage = e.getMessage();
> >>  if (errorMessage != null &&
> >>  errorMessage.startsWith("Broker unexpectedly doesn't
> >> support requireStable flag on version ")) {
> >>
> >>  log.error("Shutting down because the Kafka cluster
> >> seems to be on a too old version. " +
> >>  "Setting {}=\"{}\" requires broker version 2.5 or
> >> higher.",
> >>  StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> >>  EXACTLY_ONCE_BETA);
> >>
> >>  throw e;
> >>  }
> >>  }
> >>
> >>  log.error("Encountered the following exception during
> processing " +
> >>  "and the thread is going to shut down: ", e);
> >>  throw e;
> >>  } finally {
> >>  completeShutdown(cleanRun);
> >>  }
> >> }
> >>
> >>
> >> Thanks and kind regards
> >>
> >>
> >> Scott Sinclair
>


Re: poll block question

2021-11-24 Thread Luke Chen
Hi xusheng,
I checked the code stack, and mapped to the kafka code, I can see we either
use Selector#selectNow, or Selector#select(timeoutMs), which should never
hang forever.
If so, I think it might be the `timeoutMS` is too large, and from your code
stack, the timeoutMS came from the Consumer#poll().
Could you reduce the poll timeout to see if that fixes the issue?

Also, I'm checking the code in v3.1, so it might also be possible that
there are some bugs in V2.3 and fixed in future release.

Thank you.
Luke

On Wed, Nov 24, 2021 at 11:12 PM xusheng  wrote:

> hi
> i find something not sure about poll.
> i need some help
> my kafka server version is 1.1.0
> my kafka client version is 2.3.0
> then 128 partition of topic xxx, start 128 thread in one application to
> consume message.
> always run well, but run about a week long , i find there is a consumer
> hang forever.
> strange is only one partition stop consume.
> i dump the Thread info :
>
> "ConsumerGroup-58" #302 prio=5 os_prio=0 tid=0x7f5419626800
> nid=0x191d9 runnable [0x7f540c95]
>
>java.lang.Thread.State: RUNNABLE
>
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>
> - locked <0x0004461055c8> (a sun.nio.ch.Util$3)
>
> - locked <0x0004461055b8> (a java.util.Collections$UnmodifiableSet)
>
> - locked <0x000449932670> (a sun.nio.ch.EPollSelectorImpl)
>
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>
> at org.apache.kafka.common.network.Selector.select(Selector.java:794)
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>
>
>
>
>
>


Re: KIP-800: Add reason to LeaveGroupRequest

2021-11-24 Thread Luke Chen
Hi David,
Sorry for the late reply.
Thanks for the update. It looks good now.
I love the idea to add joinGroup request reason, as well as the
`enforceRebalance` API!

One minor comment, since we extended the original KIP from leaveGroup to
joinGroup request, Could you please update the KIP title and also the
motivation section?
It makes the KIP much clearer for future reference.

Otherwise, LGTM!

Thank you.
Luke

On Fri, Nov 19, 2021 at 11:23 PM David Jacot 
wrote:

> I have updated the KIP.
>
> Best,
> David
>
> On Fri, Nov 19, 2021 at 3:00 PM David Jacot  wrote:
> >
> > Thank you all for your feedback. Let me address all your points below.
> >
> > Luke,
> > 1. I use a tag field so bumping the request version is not necessary. In
> > this case, using a tag field does not seem to be the best approach so
> > I will use a regular one and bump the version.
> > 2. Sounds good. I will fix that comment.
> > 3. That is a good question. My intent was to avoid getting weird or
> cryptic
> > reasons logged on the broker so I thought that having a standard one is
> > better. As Sophie suggested something similar for the `enforceRebalance`
> > API, we could do it for both, I suppose.
> >
> > Ismael,
> > 1. That's a good point. I chose to use a tag field to avoid having to
> bump
> > the request version. In this particular case, it seems that bumping the
> > version does not cost much so it is perhaps better. I will change this.
> >
> > Sophie,
> > 1. That's a pretty good idea, thanks. Let me update the KIP to include
> > a reason in the JoinGroup request. Regarding the Consumer API, do
> > you think that there is value for KStreams to expose the possibility to
> > provide a reason? Otherwise, it might be better to use a default
> > reason in this case.
> > 2. I don't fully get your point about providing the reason to the group
> > leader assignor on the client. Do you think that we should propagate
> > it to all the consumers or to the leader as well? The user usually has
> > access to all its client logs so I am not sure that it is really
> necessary.
> > Could you elaborate?
> >
> > I will update the KIP soon.
> >
> > Best,
> > David
> >
> > On Sat, Nov 13, 2021 at 6:00 AM Sophie Blee-Goldman
> >  wrote:
> > >
> > > This sounds great, thanks David.
> > >
> > > One thought: what do you think about doing something similar for the
> > > JoinGroup request?
> > >
> > > When you only have broker logs and not client logs, it's somewhere
> between
> > > challenging and
> > > impossible to determine the reason for a rebalance that was triggered
> > > explicitly by the client or
> > > even the user. For example, when a followup rebalance is requested to
> > > assign the revoked
> > > partitions after a cooperative rebalance. Or any of the many reasons we
> > > trigger a rebalance
> > > in Kafka Streams, via the #enforceRebalance API.
> > >
> > > Perhaps we could add a parameter to that method as such:
> > >
> > > public void enforceRebalance(final String reason);
> > >
> > > Then we can add that to the JoinGroup request/ConsumerProtocol. Not
> only
> > > would it help to
> > > log this reason on the broker side, the information about who
> requested the
> > > rebalance and why
> > > could be extremely useful to have available to the group
> leader/partition
> > > assignor on the client
> > > side.
> > >
> > > Cheers,
> > > Sophie
> > >
> > > On Fri, Nov 12, 2021 at 10:05 AM Ismael Juma 
> wrote:
> > >
> > > > Thanks David, this is a worthwhile improvement. Quick question, why
> did we
> > > > pick a tagged field here?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Nov 11, 2021, 8:32 AM David Jacot
> 
> > > > wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to discuss this very small KIP which proposes to add a
> reason
> > > > > field
> > > > > to the LeaveGroupRequest in order to let the broker know why a
> member
> > > > > left the group. This would be really handy for administrators.
> > > > >
> > > > > KIP: https://cwiki.apache.org/confluence/x/eYyqCw
> > > > >
> > > > > Cheers,
> > > > > David
> > > > >
> > > >
>


[jira] [Created] (KAFKA-13481) poll(Duration) hung forever need restart

2021-11-24 Thread jimmy huang (Jira)
jimmy huang created KAFKA-13481:
---

 Summary: poll(Duration) hung forever need restart
 Key: KAFKA-13481
 URL: https://issues.apache.org/jira/browse/KAFKA-13481
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
Reporter: jimmy huang


my kafka broker version is 1.1.0;

topic partition number is 128. then i create 128 Thread for per consumer in one 
application;

my consuemrs always run well,but sometimes there is a consumer hung in 
poll.only this consumer stuck , others run well.

i dump the Thread info :

  java.lang.Thread.State: RUNNABLE

at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)

at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

- locked <0x0004461055c8> (a sun.nio.ch.Util$3)

- locked <0x0004461055b8> (a java.util.Collections$UnmodifiableSet)

- locked <0x000449932670> (a sun.nio.ch.EPollSelectorImpl)

at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

at org.apache.kafka.common.network.Selector.select(Selector.java:794)

at org.apache.kafka.common.network.Selector.poll(Selector.java:467)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)

 

no error log ,no rebalance situation。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13480) IQv2: Track Position in KeyValue stores

2021-11-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13480.
--
Resolution: Fixed

> IQv2: Track Position in KeyValue stores
> ---
>
> Key: KAFKA-13480
> URL: https://issues.apache.org/jira/browse/KAFKA-13480
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Patrick Stuedi
>Priority: Major
>
> Book-keep the latest seen position in key-value stores to support IQv2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13480) IQv2: Track Position in KeyValue stores

2021-11-24 Thread John Roesler (Jira)
John Roesler created KAFKA-13480:


 Summary: IQv2: Track Position in KeyValue stores
 Key: KAFKA-13480
 URL: https://issues.apache.org/jira/browse/KAFKA-13480
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Book-keep the latest seen position in key-value stores to support IQv2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13479) Interactive Query v2

2021-11-24 Thread John Roesler (Jira)
John Roesler created KAFKA-13479:


 Summary: Interactive Query v2
 Key: KAFKA-13479
 URL: https://issues.apache.org/jira/browse/KAFKA-13479
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: John Roesler


Kafka Streams supports an interesting and innovative API for "peeking" into the 
internal state of running stateful stream processors from outside of the 
application, called Interactive Query (IQ). This functionality has proven 
invaluable to users over the years for everything from debugging running 
applications to serving low latency queries straight from the Streams runtime.

However, the actual interfaces for IQ were designed in the very early days of 
Kafka Streams, before the project had gained significant adoption, and in the 
absence of much precedent for this kind of API in peer projects. With the 
benefit of hindsight, we can observe several problems with the original design 
that we hope to address in a revised framework that will serve Streams users 
well for many years to come.

 

This ticket tracks the implementation of KIP-796: 
https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-800: Add reason to LeaveGroupRequest

2021-11-24 Thread Gwen Shapira
+1

Thanks for driving David. Super useful.

On Wed, Nov 24, 2021 at 8:53 AM David Jacot 
wrote:

> Hi folks,
>
> I'd like to start a vote on KIP-800: Add reason to LeaveGroupRequest.
>
> KIP: https://cwiki.apache.org/confluence/x/eYyqCw
>
> Please let me know what you think.
>
> Best,
> David
>


-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Permission to contribute to Apache Kafka

2021-11-24 Thread Brett Jordan
Hi,
I would like to be able to contribute to Apache Kafka.

Wiki ID: brett.jordan
Jira ID:  brett.jordan

Cheers,
[image: Airwallex]
Brett Jordan
Engineering Lead - FX Product
+61 423 105 564 <+61423105564>
brett.jor...@airwallex.com [image: Airwallex] 


[jira] [Resolved] (KAFKA-13357) Controller snapshot contains producer ids records but broker does not

2021-11-24 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-13357.

Resolution: Fixed

> Controller snapshot contains producer ids records but broker does not
> -
>
> Key: KAFKA-13357
> URL: https://issues.apache.org/jira/browse/KAFKA-13357
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Blocker
>
> MetadataDelta ignores PRODUCER_IDS_RECORDS. A broker doesn't need this state 
> for its operation. The broker needs to handle this records if we want to hold 
> the invariant that controllers snapshots are equivalent to broker snapshots.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[VOTE] KIP-800: Add reason to LeaveGroupRequest

2021-11-24 Thread David Jacot
Hi folks,

I'd like to start a vote on KIP-800: Add reason to LeaveGroupRequest.

KIP: https://cwiki.apache.org/confluence/x/eYyqCw

Please let me know what you think.

Best,
David


[DISCUSS] KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling
Hey all,

I would like to propose a KIP for Apache Kafka Connect which adds
validation support for SMT-related configuration options:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options

This feature allows users to make sure an SMT is configured correctly
before actually putting a connector with that SMT in place.

Any feedback, comments, and suggestions around this proposal will
be greatly appreciated.

Thanks,

--Gunnar


[Discuss] KIP-803: Add Task ID and Connector Name to Connect Task Context

2021-11-24 Thread Sarah Story
I have written a KIP for adding Task ID and Connector name to the Connect
API task context.

Here is the doc: https://cwiki.apache.org/confluence/x/4pKqCw

Looking forward to hearing all of your thoughts!


[jira] [Created] (KAFKA-13478) KIP-802: Validation Support for Kafka Connect SMT Options

2021-11-24 Thread Gunnar Morling (Jira)
Gunnar Morling created KAFKA-13478:
--

 Summary: KIP-802: Validation Support for Kafka Connect SMT Options
 Key: KAFKA-13478
 URL: https://issues.apache.org/jira/browse/KAFKA-13478
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


Implement [KIP-802|KIP-802: Validation Support for Kafka Connect SMT Options], 
adding validation support for SMT options.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13477) Add Task ID and Connector Name to Connect Task Context

2021-11-24 Thread Sarah Story (Jira)
Sarah Story created KAFKA-13477:
---

 Summary: Add Task ID and Connector Name to Connect Task Context
 Key: KAFKA-13477
 URL: https://issues.apache.org/jira/browse/KAFKA-13477
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sarah Story
Assignee: Sarah Story


Developers who are implementing a Kafka Connector do not have a convenient way 
to access the Task ID and Connector Name within their task implementation. The 
Task ID and Connector Name are important pieces of information that can be used 
for custom metrics, debugging, and logging. 

[KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
 added the Task ID and Connector Name to the SLF4J MDC. It was accepted, and 
released as part of AK 2.3.0. That added the task context to the logs. It would 
be nice to take it a step further and make the Task ID and Connector Name 
available for developers to use within their task implementation however they 
see fit.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


poll block question

2021-11-24 Thread xusheng
hi
i find something not sure about poll.
i need some help
my kafka server version is 1.1.0
my kafka client version is 2.3.0
then 128 partition of topic xxx, start 128 thread in one application to consume 
message.
always run well, but run about a week long , i find there is a consumer hang 
forever.
strange is only one partition stop consume.
i dump the Thread info :

"ConsumerGroup-58" #302 prio=5 os_prio=0 tid=0x7f5419626800 nid=0x191d9 
runnable [0x7f540c95]

   java.lang.Thread.State: RUNNABLE

at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)

at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

- locked <0x0004461055c8> (a sun.nio.ch.Util$3)

- locked <0x0004461055b8> (a java.util.Collections$UnmodifiableSet)

- locked <0x000449932670> (a sun.nio.ch.EPollSelectorImpl)

at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

at org.apache.kafka.common.network.Selector.select(Selector.java:794)

at org.apache.kafka.common.network.Selector.poll(Selector.java:467)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)







[jira] [Created] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found

2021-11-24 Thread Richard Bosch (Jira)
Richard Bosch created KAFKA-13476:
-

 Summary: Streams crashes when non Base64 Offset Metadata is found
 Key: KAFKA-13476
 URL: https://issues.apache.org/jira/browse/KAFKA-13476
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Richard Bosch


Kafka Streams applications use the metadata stored with the committed offsets 
from previous running instances to extract timestamps.

But when the metadata field contains other data the Base64 decoder will throw 
an exception causing the Streams application to fail.
A new Offset commit is then required to stop this failure.

I've included the part of the log when we started a Kafka Streams app after 
setting the offsets using a third party tool. This tool adds some tracing 
metadata so developers and operators could debug who performed this custom 
offset commit.

 
{noformat}
2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] 
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, 
groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns and 
assigned partitions
at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039)
 ~[kafka-streams-2.7.0.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
 ~[kafka-streams-2.7.0.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837)
 ~[kafka-streams-2.7.0.jar:na]
java.lang.IllegalArgumentException: Illegal base64 character 7b
at 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
 ~[kafka-streams-2.7.0.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818)
 ~[kafka-streams-2.7.0.jar:na]
2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] 
org.apache.kafka.streams.KafkaStreams: stream-client 
[streams-example-app-1] All stream threads have died. The instance will be in 
error state and should be closed.
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
 ~[kafka-streams-2.7.0.jar:na]
java.lang.IllegalArgumentException: Illegal base64 character 7b
{noformat}
I recommend adding a Try Catch block around the Base64 decode in the 
StreamTask.decodeTimestamp method and return the Unknown value when this occurs.
This is pure for resilience when bad data is encountered.
After the Streams application performs a new offset commit the error should not 
occur again, limiting the change of frequently occurring warnings in the logs

I've already made the changes and added a test for this issue, as I would like 
to contribute to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13475) max-compaction-delay-secs reported at one clean "behind" current state

2021-11-24 Thread Joe (Jira)
Joe created KAFKA-13475:
---

 Summary: max-compaction-delay-secs reported at one clean "behind" 
current state
 Key: KAFKA-13475
 URL: https://issues.apache.org/jira/browse/KAFKA-13475
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 3.0.0
Reporter: Joe


We have a compacted topic with a very slow rate of incoming messages, along 
with other much faster topics.

On the slow topic, {{max.compaction.lag.ms}}  is set to 120960 (14 days). 
The first message was produced on 2021-11-05, and the second message, with the 
same key, was produced 2021-11-24. The log cleaner immediately started and 
cleaned the partition, but the {{max-compaction-delay-secs}} metric continued 
reporting cleaning was 5 days delayed, until the *next* partition was cleaned:

{noformat}
[2021-11-24 12:35:33,767] INFO [kafka-log-cleaner-thread-2]:Cleanable 
partitions: 1, Delayed partitions: 1, max delay: 1150 (kafka.log.LogCleaner)
[2021-11-24 12:43:49,135] INFO Cleaner 2: Beginning cleaning of log slow-0. 
(kafka.log.LogCleaner)
[2021-11-24 12:43:49,135] INFO Cleaner 2: Building offset map for slow-0... 
(kafka.log.LogCleaner)
[2021-11-24 12:43:49,457] INFO Cleaner 2: Building offset map for log slow-0 
for 1 segments in offset range [0, 1). (kafka.log.LogCleaner)
[2021-11-24 12:43:49,457] INFO Cleaner 2: Offset map for log slow-0 complete. 
(kafka.log.LogCleaner)
[2021-11-24 12:43:49,458] INFO Cleaner 2: Cleaning log slow-0 (cleaning prior 
to Fri Nov 05 14:15:49 CET 2021, discarding tombstones prior to Thu Jan 01 
01:00:00 CET 1970)... (kafka.log.LogCleaner)
[2021-11-24 12:43:49,459] INFO Cleaner 2: Cleaning LogSegment(baseOffset=0, 
size=205, lastModifiedTime=1636118149816, 
largestRecordTimestamp=Some(1636118148817)) in log slow-0 into 0 with deletion 
horizon 0, retaining deletes. (kafka.log.LogCleaner)
[2021-11-24 12:43:49,466] INFO Cleaner 2: Swapping in cleaned segment 
LogSegment(baseOffset=0, size=205, lastModifiedTime=1636118149816, 
largestRecordTimestamp=Some(1636118148817)) for segment(s) 
List(LogSegment(baseOffset=0, size=205, lastModifiedTime=1636118149816, 
largestRecordTimestamp=Some(1636118148817))) in log Log(dir=/data/kafka/slow-0, 
topicId=UZ8ZjcljSvefr-YOKbkjGw, topic=dataset, partition=0, highWatermark=2, 
lastStableOffset=2, logStartOffset=0, logEndOffset=2) (kafka.log.LogCleaner)
[2021-11-24 12:43:49,467] INFO [kafka-log-cleaner-thread-2]: 
Log cleaner thread 2 cleaned log slow-0 (dirty section = [0, 1])
0.0 MB of log processed in 0.3 seconds (0.0 MB/sec).
Indexed 0.0 MB in 0.3 seconds (0.0 Mb/sec, 97.3% of total time)
Buffer utilization: 0.0%
Cleaned 0.0 MB in 0.0 seconds (0.0 Mb/sec, 2.7% of total time)
Start size: 0.0 MB (1 messages)
End size: 0.0 MB (1 messages)
0.0% size reduction (0.0% fewer messages)
 (kafka.log.LogCleaner)
[2021-11-24 12:43:49,467] INFO [kafka-log-cleaner-thread-2]:Cleanable 
partitions: 1, Delayed partitions: 1, max delay: 426480297 
(kafka.log.LogCleaner)
[2021-11-24 14:06:08,164] INFO Cleaner 2: Beginning cleaning of log fast-45. 
(kafka.log.LogCleaner)
...
[2021-11-24 14:06:26,576] INFO [kafka-log-cleaner-thread-2]: 
Log cleaner thread 2 cleaned log fast-45 (dirty section = [437517047, 
437564906])
1,518.5 MB of log processed in 18.4 seconds (82.5 MB/sec).
Indexed 21.3 MB in 0.4 seconds (49.8 Mb/sec, 2.3% of total time)
Buffer utilization: 0.1%
Cleaned 1,518.5 MB in 18.0 seconds (84.4 Mb/sec, 97.7% of total time)
Start size: 1,518.5 MB (12,054,661 messages)
End size: 1,500.7 MB (12,008,059 messages)
1.2% size reduction (0.4% fewer messages)
 (kafka.log.LogCleaner)
[2021-11-24 14:06:26,576] INFO [kafka-log-cleaner-thread-2]:Cleanable 
partitions: 1, Delayed partitions: 1, max delay: 3051 (kafka.log.LogCleaner)
{noformat}

Between 12:43 and 14:06, {{max-compaction-delay-secs}} was still reported as 
426480. Unfortunately, this was enough to confuse our alerts (and developers).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.1 #22

2021-11-24 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 499919 lines...]
[2021-11-24T12:14:21.430Z] MetricsTest > testBrokerStateMetric(String) > 
kafka.metrics.MetricsTest.testBrokerStateMetric(String)[1] PASSED
[2021-11-24T12:14:21.430Z] 
[2021-11-24T12:14:21.430Z] MetricsTest > testBrokerStateMetric(String) > 
kafka.metrics.MetricsTest.testBrokerStateMetric(String)[2] STARTED
[2021-11-24T12:14:22.119Z] 
[2021-11-24T12:14:22.119Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted STARTED
[2021-11-24T12:14:23.690Z] 
[2021-11-24T12:14:23.690Z] MetricsTest > testBrokerStateMetric(String) > 
kafka.metrics.MetricsTest.testBrokerStateMetric(String)[2] PASSED
[2021-11-24T12:14:23.690Z] 
[2021-11-24T12:14:23.690Z] MetricsTest > testZkControllerMetrics(String) > 
kafka.metrics.MetricsTest.testZkControllerMetrics(String)[1] STARTED
[2021-11-24T12:14:26.805Z] 
[2021-11-24T12:14:26.805Z] MetricsTest > testZkControllerMetrics(String) > 
kafka.metrics.MetricsTest.testZkControllerMetrics(String)[1] PASSED
[2021-11-24T12:14:26.805Z] 
[2021-11-24T12:14:26.805Z] MetricsTest > 
testBrokerTopicMetricsBytesInOut(String) > 
kafka.metrics.MetricsTest.testBrokerTopicMetricsBytesInOut(String)[1] STARTED
[2021-11-24T12:14:29.159Z] 
[2021-11-24T12:14:29.159Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted PASSED
[2021-11-24T12:14:30.100Z] 
[2021-11-24T12:14:30.100Z] > Task :core:integrationTest
[2021-11-24T12:14:30.100Z] 
[2021-11-24T12:14:30.100Z] LogCleanerParameterizedIntegrationTest > 
testCleanerWithMessageFormatV0(CompressionType) > 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0(CompressionType)[2]
 PASSED
[2021-11-24T12:14:30.100Z] 
[2021-11-24T12:14:30.100Z] LogCleanerParameterizedIntegrationTest > 
testCleanerWithMessageFormatV0(CompressionType) > 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0(CompressionType)[3]
 STARTED
[2021-11-24T12:14:30.994Z] 
[2021-11-24T12:14:30.994Z] MetricsTest > 
testBrokerTopicMetricsBytesInOut(String) > 
kafka.metrics.MetricsTest.testBrokerTopicMetricsBytesInOut(String)[1] PASSED
[2021-11-24T12:14:30.994Z] 
[2021-11-24T12:14:30.994Z] MetricsTest > 
testBrokerTopicMetricsBytesInOut(String) > 
kafka.metrics.MetricsTest.testBrokerTopicMetricsBytesInOut(String)[2] STARTED
[2021-11-24T12:14:34.793Z] 
[2021-11-24T12:14:34.793Z] MetricsTest > 
testBrokerTopicMetricsBytesInOut(String) > 
kafka.metrics.MetricsTest.testBrokerTopicMetricsBytesInOut(String)[2] PASSED
[2021-11-24T12:14:34.793Z] 
[2021-11-24T12:14:34.793Z] MetricsTest > testJMXFilter(String) > 
kafka.metrics.MetricsTest.testJMXFilter(String)[1] STARTED
[2021-11-24T12:14:37.018Z] 
[2021-11-24T12:14:37.018Z] MetricsTest > testJMXFilter(String) > 
kafka.metrics.MetricsTest.testJMXFilter(String)[1] PASSED
[2021-11-24T12:14:37.018Z] 
[2021-11-24T12:14:37.018Z] MetricsTest > testJMXFilter(String) > 
kafka.metrics.MetricsTest.testJMXFilter(String)[2] STARTED
[2021-11-24T12:14:37.139Z] 
[2021-11-24T12:14:37.139Z] LogCleanerParameterizedIntegrationTest > 
testCleanerWithMessageFormatV0(CompressionType) > 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0(CompressionType)[3]
 PASSED
[2021-11-24T12:14:37.139Z] 
[2021-11-24T12:14:37.139Z] LogCleanerParameterizedIntegrationTest > 
testCleanerWithMessageFormatV0(CompressionType) > 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0(CompressionType)[4]
 STARTED
[2021-11-24T12:14:37.139Z] 
[2021-11-24T12:14:37.139Z] > Task :streams:integrationTest
[2021-11-24T12:14:37.139Z] streams-6: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-1: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-3: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-4: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-0: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-5: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:37.139Z] streams-2: SMOKE-TEST-CLIENT-CLOSED
[2021-11-24T12:14:40.309Z] 
[2021-11-24T12:14:40.309Z] MetricsTest > testJMXFilter(String) > 
kafka.metrics.MetricsTest.testJMXFilter(String)[2] PASSED
[2021-11-24T12:14:42.581Z] 
[2021-11-24T12:14:42.581Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2021-11-24T12:14:42.581Z] 
[2021-11-24T12:14:42.581Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2021-11-24T12:14:42.581Z] 
[2021-11-24T12:14:42.581Z] See 
https://docs.gradle.org/7.2/userguide/command_line_interface.html#sec:command_line_warnings
[2021-11-24T12:14:42.581Z] 
[2021-11-24T12:14:42.581Z] BUILD SUCCESSFUL in 1h 30m 39s
[2021-11-24T12:14:42.58

Re: [DISCUSS] Apache Kafka 3.1.0 release

2021-11-24 Thread David Jacot
Hi Mickael,

Thanks for reporting it. It makes sense to include it in the 3.1 release
as well as it is a regression.

Thanks,
David

On Tue, Nov 23, 2021 at 6:52 PM Mickael Maison  wrote:
>
> Hi David,
>
> Can we also consider https://issues.apache.org/jira/browse/KAFKA-13397?
> It's essentially a regression but in a very specific case. To hit it,
> you must be running MirrorMaker in dedicated mode and have changed the
> separator of the default replication policy.
>
> Thanks,
> Mickael
>
> On Tue, Nov 23, 2021 at 4:58 PM David Jacot  
> wrote:
> >
> > Hi Ron,
> >
> > Thank you for reaching out about this. While this is clearly not a
> > regression, I agree with including it in 3.1 in order to have proper
> > and correct configuration constraints for KRaft. You can proceed.
> >
> > Cheers,
> > David
> >
> > On Tue, Nov 23, 2021 at 2:55 PM Ron Dagostino  wrote:
> > >
> > > Hi David.  I would like to nominate
> > > https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-13456
> > > "Tighten KRaft config checks/constraints" as a 3.1.0 blocker.  The
> > > existing configuration constraints/checks related to KRaft currently
> > > do not eliminate certain illegal configuration combinations. The
> > > illegal combinations do not cause harm at the moment, but we would
> > > like to implement constraints in 3.1.0 to catch them while KRaft is
> > > still in Preview.  We could add these additional checks later in 3.2.x
> > > instead, but we would like to add these as early as possible: we
> > > expect more people to begin trying KRaft with each subsequent release,
> > > and it would be best to eliminate as quickly as we can the possibility
> > > of people using configurations that would need fixing later.
> > >
> > > A patch is available at https://github.com/apache/kafka/pull/11503/.
> > >
> > > Ron
> > >
> > >
> > > On Tue, Nov 23, 2021 at 3:19 AM David Jacot  
> > > wrote:
> > > >
> > > > Hi Chris,
> > > >
> > > > Thanks for reporting both issues. As both are regressions, I do agree 
> > > > that
> > > > they are blockers and that we would fix them for 3.1.
> > > >
> > > > Cheers,
> > > > David
> > > >
> > > > On Mon, Nov 22, 2021 at 10:50 PM Chris Egerton
> > > >  wrote:
> > > > >
> > > > > Hi David,
> > > > >
> > > > > I have another blocker to propose. KAFKA-13472 (
> > > > > https://issues.apache.org/jira/browse/KAFKA-13472) is another 
> > > > > regression in
> > > > > Connect caused by recently-merged changes for KAFKA-12487 (
> > > > > https://issues.apache.org/jira/browse/KAFKA-12487) which can lead to 
> > > > > data
> > > > > loss in sink connectors in some rare edge cases. I've opened a fix PR 
> > > > > (
> > > > > https://github.com/apache/kafka/pull/11526) already, and have also 
> > > > > opened a
> > > > > fix PR (https://github.com/apache/kafka/pull/11524) for the 
> > > > > aforementioned
> > > > > KAFKA-13469.
> > > > >
> > > > > Please let me know if we can merge a fix for this in time for the 
> > > > > 3.1.0
> > > > > release; if not, as with KAFKA-13469, we may want to revert the 
> > > > > changes for
> > > > > the PR that cause this issue (in this case, that'd be the PR for
> > > > > KAFKA-12487).
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Mon, Nov 22, 2021 at 11:42 AM Chris Egerton  
> > > > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > I'd like to propose KAFKA-13469 (
> > > > > > https://issues.apache.org/jira/browse/KAFKA-13469) as a blocker. It 
> > > > > > is a
> > > > > > regression in Connect caused by recently-merged changes for 
> > > > > > KAFKA-12226 (
> > > > > > https://issues.apache.org/jira/browse/KAFKA-12226) which leads to
> > > > > > duplicate records for source tasks. I plan to have a fix PR opened 
> > > > > > by the
> > > > > > end of the day.
> > > > > >
> > > > > > Please let me know if we can merge a fix for this in time for the 
> > > > > > 3.1.0
> > > > > > release; if not, we may want to revert the changes for KAFKA-12226.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Mon, Nov 15, 2021 at 5:02 AM David Jacot 
> > > > > > 
> > > > > > wrote:
> > > > > >
> > > > > >> Hi folks,
> > > > > >>
> > > > > >> We reached the code freeze for the Apache Kafka 3.1 release on 
> > > > > >> Friday.
> > > > > >> Therefore,
> > > > > >> we will only accept blockers from now on.
> > > > > >>
> > > > > >> There already are a couple of blockers identified which were not
> > > > > >> completed before
> > > > > >> the code freeze. Please, raise any new blockers to this thread.
> > > > > >>
> > > > > >> For all the non-blocker issues targeting 3.1.0, I will move them to
> > > > > >> the next release.
> > > > > >>
> > > > > >> Cheers,
> > > > > >> David
> > > > > >>
> > > > > >> On Fri, Oct 29, 2021 at 12:20 PM Dongjin Lee  
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > Hi David,
> > > > > >> >
> > > > > >> > Please update the components of the following KIPs:
> > > > > >> >
> > > > >

Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-11-24 Thread Luke Chen
Hi Artem,
Yes, I agree if we go with random selection instead of round-robin
selection, the latency issue will be more fair. That is, if there are 10
partitions, the 10th partition will always be the last choice in each round
in current design, but with random selection, the chance to be selected is
more fair.

However, I think that's kind of out of scope with this KIP. This is an
existing issue, and it might need further discussion to decide if this
change is necessary.

I agree the default 32KB for "batch.max.size" might be not huge improvement
compared with 256KB. I'm thinking, maybe default to "64KB" for
"batch.max.size", and make the documentation clear that if the "batch.max.size"
is increased, there might be chances that the "ready" partitions need to
wait for next request to send to broker, because of the "max.request.size"
(default 1MB) limitation. "max.request.size" can also be considered to
increase to avoid this issue. What do you think?

Thank you.
Luke

On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
 wrote:

> >  maybe I can firstly decrease the "batch.max.size" to 32KB
>
> I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> partition.  We should probably set up some testing and see if 256KB has
> problems.
>
> To illustrate latency dynamics, let's consider a simplified model: 1
> in-flight request per broker, produce latency 125ms, 256KB max request
> size, 16 partitions assigned to the same broker, every second 128KB is
> produced to each partition (total production rate is 2MB/sec).
>
> If the batch size is 16KB, then the pattern would be the following:
>
> 0ms - produce 128KB into each partition
> 0ms - take 16KB from each partition send (total 256KB)
> 125ms - complete first 16KB from each partition, send next 16KB
> 250ms - complete second 16KB, send next 16KB
> ...
> 1000ms - complete 8th 16KB from each partition
>
> from this model it's easy to see that there are 256KB that are sent
> immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> 875ms.
>
> If the batch size is 256KB, then the pattern would be the following:
>
> 0ms - produce 128KB into each partition
> 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> 125ms - complete 2 first partitions, send data from next 2 partitions
> ...
> 1000ms - complete last 2 partitions
>
> even though the pattern is different, there are still 256KB that are sent
> immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> 875ms.
>
> Now, in this example if we do strictly round-robin (current implementation)
> and we have this exact pattern (not sure how often such regular pattern
> would happen in practice -- I would expect that it would be a bit more
> random), some partitions would experience higher latency than others (not
> sure how much it would matter in practice -- in the end of the day some
> bytes produced to a topic would have higher latency and some bytes would
> have lower latency).  This pattern is easily fixed by choosing the next
> partition randomly instead of using round-robin.
>
> -Artem
>
> On Tue, Nov 23, 2021 at 12:08 AM Luke Chen  wrote:
>
> > Hi Tom,
> > Thanks for your comments. And thanks for Artem's explanation.
> > Below is my response:
> >
> > > Currently because buffers are allocated using batch.size it means we
> can
> > handle records that are that large (e.g. one big record per batch).
> Doesn't
> > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > corresponding decrease in the maximum record size that the producer can
> > handle?
> >
> > Actually, the "batch.size" is only like a threshold to decide if the
> batch
> > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > (default value), users can still send one record sized with 20KB, as long
> > as the size is less than "max.request.size" in producer (default 1MB).
> > Therefore, the introduction of "batch.initial.size" won't decrease the
> > maximum record size that the producer can handle.
> >
> > > But isn't there the risk that drainBatchesForOneNode would end up not
> > sending ready
> > batches well past when they ought to be sent (according to their
> linger.ms
> > ),
> > because it's sending buffers for earlier partitions too aggressively?
> >
> > Did you mean that we have a "max.request.size" per request (default is
> > 1MB), and before this KIP, the request can include 64 batches in single
> > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to
> > include 32 batches or less, because we aggressively sent more records in
> > one batch, is that what you meant? That's a really good point that I've
> > never thought about. I think your suggestion to go through other
> partitions
> > that just fit "batch.size", or expire "linger.ms" first, before handling
> > the one that is > "batch.size" limit is not a good way, because it might
> > cause the one with size 

[jira] [Resolved] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2021-11-24 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13457.
-
Fix Version/s: 3.2.0
 Reviewer: David Jacot
   Resolution: Fixed

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-798 Add possibility to write kafka headers in Kafka Console Producer

2021-11-24 Thread David Jacot
Hi Florin,

Yes, you can close the vote.

Best,
David

On Wed, Nov 24, 2021 at 3:19 AM Luke Chen  wrote:
>
> Hi Florin,
> I'm not a committer, but yes, this vote be concluded now.
>
> Thank you.
> Luke
>
> On Wed, Nov 24, 2021 at 3:08 AM Florin Akermann 
> wrote:
>
> > Thanks all.
> >
> > The 72h window is through.
> >
> > @Comitters can this vote be concluded?
> >
> > The vote on KIP-798 would pass with:
> > 4 binding +1
> > 1 non-binding +1
> > no vetoes
> >
> > Thanks,
> > Florin
> >
> >
> > On Tue, 23 Nov 2021 at 06:59, Luke Chen  wrote:
> >
> > > Hi Florin,
> > > Thanks for the update!
> > >
> > > +1 (non-binding)
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Nov 23, 2021 at 2:00 AM Florin Akermann <
> > florin.akerm...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Bill and David,
> > > >
> > > > Thank you both for the vote.
> > > > @David: KIP is updated.
> > > >
> > > > Florin
> > > >
> > > > On Mon, 22 Nov 2021 at 18:28, David Jacot  > >
> > > > wrote:
> > > >
> > > > > Hi Florin,
> > > > >
> > > > > Thanks for the KIP. I am +1 (binding).
> > > > >
> > > > > There is a small typo in the Proposed Changes section:
> > > > > `parse.header` should be `parse.headers`.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Mon, Nov 22, 2021 at 6:20 PM Bill Bejeck 
> > wrote:
> > > > > >
> > > > > > Hi Florin,
> > > > > >
> > > > > > Thanks for the KIP, this seems like a very useful addition.
> > > > > >
> > > > > > +1(binding).
> > > > > >
> > > > > > -Bill
> > > > > >
> > > > > > On Mon, Nov 22, 2021 at 12:00 PM Florin Akermann <
> > > > > florin.akerm...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Luke and Tom
> > > > > > >
> > > > > > > @Tom: Thanks for the vote.
> > > > > > >
> > > > > > > @Luke: Thanks for the feedback.
> > > > > > >
> > > > > > > I have updated the KIP accordingly with regards to your comments
> > on
> > > > the
> > > > > > > remaining case (false,false) and the motivation.
> > > > > > >
> > > > > > > Regarding the "not only UTF-8": As far as I understand John it is
> > > > fine
> > > > > to
> > > > > > > limit the scope for this change to UTF-8 only as it is a handy
> > > > > addition on
> > > > > > > its own. Other formats can be relatively easily supported by
> > adding
> > > > > more
> > > > > > > properties in later KIPs. In my reply to John (email from 21 Nov
> > > > 2021,
> > > > > > > 11:29 UTC) I also added an explanation why I limited the scope to
> > > > UTF-8
> > > > > > > only.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Florin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 22 Nov 2021 at 10:32, Tom Bentley 
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Florin,
> > > > > > > >
> > > > > > > > Thanks for the KIP!
> > > > > > > >
> > > > > > > > +1 (binding),
> > > > > > > >
> > > > > > > > Kind regards,
> > > > > > > >
> > > > > > > > Tom
> > > > > > > >
> > > > > > > > On Mon, Nov 22, 2021 at 6:51 AM Luke Chen 
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Florin,
> > > > > > > > > Thanks for the KIP.
> > > > > > > > >
> > > > > > > > > This KIP makes sense to me. Just a comment that the
> > motivation
> > > > > section
> > > > > > > is
> > > > > > > > > not clearly explain why this KIP is important.
> > > > > > > > > I think John already mentioned a good motivation, which is to
> > > > > support
> > > > > > > > "not
> > > > > > > > > only UTF-8".
> > > > > > > > > You should put that into the KIP, and of course if you have
> > > other
> > > > > > > > thoughts,
> > > > > > > > > please also add them into KIP.
> > > > > > > > >
> > > > > > > > > Also, in the "public interface" section, there are 3 "Default
> > > > > parsing
> > > > > > > > > pattern", I think you should add 1 remaining case (false,
> > > false)
> > > > to
> > > > > > > make
> > > > > > > > it
> > > > > > > > > complete.
> > > > > > > > >
> > > > > > > > > Otherwise, look good to me.
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sun, Nov 21, 2021 at 7:37 PM Florin Akermann <
> > > > > > > > florin.akerm...@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi John,
> > > > > > > > > >
> > > > > > > > > > Thanks for the vote and feedback.
> > > > > > > > > >
> > > > > > > > > > The thought occurred to me too.
> > > > > > > > > >
> > > > > > > > > > Do I understand it correctly: the current version of the
> > > > > > > > > > kafka-console-producer cannot be used for anything other
> > than
> > > > > UTF-8
> > > > > > > > keys
> > > > > > > > > > and values?
> > > > > > > > > > (There is no other implementation of MessageReader other
> > than
> > > > the
> > > > > > > > > > ConsoleProducer$LineMessageReader)
> > > > > > > > > > In other words, currently users seem to only apply it with
> > > > utf-8
> > > > > > > > strings
> > > > > > > > > > for keys and values?
> > > > > > > > > > This is why I figured I would not devia