[jira] [Created] (KAFKA-13360) Wrong SSL messages when handshake fails
Rodolfo Kohn created KAFKA-13360: Summary: Wrong SSL messages when handshake fails Key: KAFKA-13360 URL: https://issues.apache.org/jira/browse/KAFKA-13360 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.8.0 Environment: Two VMs, one running one Kafka broker and the other one running kafka-console-consumer.sh. The consumer is validating the server certificate. Both VMs are VirtualBox running in the same laptop. Using internal LAN. Latency is in the order of microseconds. More details in attached PDF. Reporter: Rodolfo Kohn Attachments: Kafka error.pdf, dump_192.168.56.101_192.168.56.102_32776_9093_2021_10_06_21_09_19.pcap, ssl_kafka_error_logs_match_ssl_logs.txt, ssl_kafka_error_logs_match_ssl_logs2.txt When a consumer tries to connect to a Kafka broker and there is an error in the SSL handshake, like the server sending a certificate that cannot be validated for not matching the common name with the server/domain name, Kafka sends out erroneous SSL messages before sending an SSL alert. This error occurs in client but also can be seen in server. Because of the nature of the problem it seems it will happen in more if not all handshake errors. I've debugged and analyzed the Kafka networking code in org.apache.kafka.common.network and wrote a detailed description of how the error occurs. Attaching the pcap file and a pdf with the detailed description of where the error is in the networking code (SslTransportLayer, Channel, Selector). I executed a very basic test between kafka-console-consumer and a simple installation of one Kafka broker with TLS. The test consisted on a Kafka broker with a certificate that didn’t match the domain name I used to identify the server. The CA was well set up to avoid related problems, like unknown CA error code. Thus, when the server sends the certificate to the client, the handshake fails with code error 46 (certificate unknown). The goal was that my tool would detect the issue and send an event, describing a TLS handshake problem for both processes. However, I noticed the tool sent what I thought it was the wrong event, it sent a TLS exception event for an unexpected message instead of an event for TLS alert for certificate unknown. I noticed that during handshake, after the client receives Sever Hello, Certificate, Server Key Exchange, and Server Hello Done, it sends out the same Client Hello it sent at the beginning and then 3 more records with all zeroes, in two more messages. It sent a total of 16,709 Bytes including the 289 Bytes of Client Hello record. This looks also like a design error regarding how protocol failures are handled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11372: [DO NOT MERGE] MINOR: Do not send data on abortable error too
guozhangwang commented on pull request #11372: URL: https://github.com/apache/kafka/pull/11372#issuecomment-938240965 @hachikuji yeah I think you're right that https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L508 have this case covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies
guozhangwang commented on a change in pull request #11381: URL: https://github.com/apache/kafka/pull/11381#discussion_r724594540 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -971,6 +972,9 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin streamsUncaughtExceptionHandler ); streamThread.setStateListener(streamStateListener); +for (final Map.Entry> exceptionHandler : topologyExceptionHandlers.entrySet()) { Review comment: Should we synchronize on the `changeThreadCount` object as well? ## File path: streams/src/main/java/org/apache/kafka/streams/errors/NamedTopologyException.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.common.KafkaException; + +public class NamedTopologyException extends KafkaException { + +final String topologyName; + +public NamedTopologyException(final String topologyName, final Throwable throwable) { Review comment: Could we consider overriding the `toString` function to include the `topologyName`? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -513,8 +514,8 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set
[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions
C0urante commented on pull request #11046: URL: https://github.com/apache/kafka/pull/11046#issuecomment-937378866 @hachikuji could you take a look at this when you have a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore
mjsax commented on pull request #11370: URL: https://github.com/apache/kafka/pull/11370#issuecomment-937373122 Thanks for the PR @showuon! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery
rhauch commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every + * record up to and including the record for each returned offset has been either + * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + +private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + +// Visible for testing +final Map, Deque> records; + +public SubmittedRecords() { +this.records = new HashMap<>(); +} + +/** + * Enqueue a new source record before dispatching it to a producer. + * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the + * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully + * sent to the producer. + * + * @param record the record about to be dispatched; may not be null but may have a null + * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()} + * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by + * the producer, or {@link #remove removed} if synchronously rejected by the producer + */ +@SuppressWarnings("unchecked") +public SubmittedRecord submit(SourceRecord record) { +return submit((Map) record.sourcePartition(), (Map) record.sourceOffset()); +} + +// Convenience method for testing +SubmittedRecord submit(Map partition, Map offset) { +SubmittedRecord result = new SubmittedRecord(partition, offset); +records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) +.add(result); +return result; +} + +/** + * Remove a source record and do not take it into account any longer when tracking offsets. + * Useful if the record has been synchronously rejected by the producer. + * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null + */ +public void remove(SubmittedRecord record) { +Deque deque = records.get(record.partition()); +if (deque == null) { +log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition()); Review comment: IIUC this is really an unexpected condition, since it's single-threaded and this method is called only in the catch block after sending a record to the producer. But without that context, anyone reading this message in the log file might be confused or even concerned about what "remove record for partition..." means. WDYT about mentioning more of that context, something like: ```suggestion log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition()); ``` ## File path: connect/runtime/src/main/
[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions
TheKnowles commented on pull request #11382: URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412 Unrelated tests locally and in jenkins appear flaky. All tests related to this change pass deterministically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore
mjsax merged pull request #11370: URL: https://github.com/apache/kafka/pull/11370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hojongs commented on a change in pull request #11249: Fix wrong link to "Guarantees" part in introduction.html documentation
hojongs commented on a change in pull request #11249: URL: https://github.com/apache/kafka/pull/11249#discussion_r723837977 ## File path: docs/introduction.html ## @@ -144,13 +144,13 @@ -Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various guarantees such as the ability to process events exactly-once. +Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various guarantees such as the ability to process events exactly-once. Review comment: You're right. That's more make sense. ## File path: docs/introduction.html ## @@ -144,13 +144,13 @@ -Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various guarantees such as the ability to process events exactly-once. +Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various guarantees such as the ability to process events exactly-once. Review comment: You're right. That's more make sense. Applied at f017014 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
cmccabe merged pull request #11311: URL: https://github.com/apache/kafka/pull/11311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types
tombentley commented on a change in pull request #11384: URL: https://github.com/apache/kafka/pull/11384#discussion_r723930788 ## File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java ## @@ -64,8 +64,14 @@ public static Schema schema(int scale) { * @return the encoded value */ public static byte[] fromLogical(Schema schema, BigDecimal value) { -if (value.scale() != scale(schema)) -throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); +int schemaScale = scale(schema); +if (value.scale() != schemaScale) +throw new DataException(String.format( +"BigDecimal has mismatching scale value for given Decimal schema. " Review comment: Do we really want to mention `BigDecimal` here (it's really an implementation detail)? All the user needs to know is that the scale present in the value differs from the scale specified in the schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
C0urante commented on pull request #10907: URL: https://github.com/apache/kafka/pull/10907#issuecomment-937394815 @rhauch Is that second pass coming any time soon? It'd be nice if we could get this merged in time for the upcoming 3.1 release. I plan to address the existing comments next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'
ableegoldman merged pull request #11332: URL: https://github.com/apache/kafka/pull/11332 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'
ableegoldman commented on pull request #11332: URL: https://github.com/apache/kafka/pull/11332#issuecomment-937381455 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors
C0urante commented on pull request #10563: URL: https://github.com/apache/kafka/pull/10563#issuecomment-937395625 @kkonstantine Would you mind giving this another pass? It's been over two months. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types
C0urante commented on a change in pull request #11384: URL: https://github.com/apache/kafka/pull/11384#discussion_r724182155 ## File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java ## @@ -64,8 +64,14 @@ public static Schema schema(int scale) { * @return the encoded value */ public static byte[] fromLogical(Schema schema, BigDecimal value) { -if (value.scale() != scale(schema)) -throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); +int schemaScale = scale(schema); +if (value.scale() != schemaScale) +throw new DataException(String.format( +"BigDecimal has mismatching scale value for given Decimal schema. " Review comment: I tried to keep the same log message as before but just append new information to it. Agree that `BigDecimal` is a detail that people reading only logs (as opposed to logs in conjunction with source code) won't benefit from; can adjust accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on pull request #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-937395176 @rhauch @tombentley could either of you take a look? It'd be nice to get this merged in time for the upcoming 3.1 release; I know I've seen plenty of people led astray by continued offset commit messages for failed tasks and it'd be great if we could improve their experience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
tombentley commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r723982484 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -482,6 +486,22 @@ private synchronized void recordSent(final ProducerRecord record } } +private synchronized void recordSendFailed(ProducerRecord record) { +if (outstandingMessages.containsKey(record)) { +currentBatchFailed = true; +if (flushing) { +// flush thread may be waiting on the outstanding messages to clear +this.notifyAll(); +} +} else if (outstandingMessagesBacklog.containsKey(record)) { +backlogBatchFailed = true; +} +} + +public boolean shouldCommitOffsets() { Review comment: Can we add some Javadoc ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java ## @@ -214,6 +215,12 @@ public void testCancelAfterAwaitFlush() throws Exception { PowerMock.verifyAll(); } +private void assertBeginFlush(boolean shouldFlush) { +Consumer assertion = shouldFlush ? Assert::assertTrue : Assert::assertFalse; +assertion.accept(writer.willFlush()); +assertion.accept(writer.beginFlush()); Review comment: Wouldn't `assertEquals(shouldFlush, writer.willFlush())` and `assertEquals(shouldFlush, writer.beginFlush())` be clearer? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { +log.trace("{} Skipping offset commit as there is nothing to be committed", workerTask); Review comment: The method used in the `if` is called `shouldCommitOffsets`, not `areThereOffsetsToBeCommitted`, so is this message accurate, or perhaps the method name is slightly misleading? Maybe amending the message to "...there are no offsets that should be committed"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery
rhauch commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every + * record up to and including the record for each returned offset has been either + * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + +private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + +// Visible for testing +final Map, Deque> records; + +public SubmittedRecords() { +this.records = new HashMap<>(); +} + +/** + * Enqueue a new source record before dispatching it to a producer. + * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the + * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully + * sent to the producer. + * + * @param record the record about to be dispatched; may not be null but may have a null + * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()} + * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by + * the producer, or {@link #remove removed} if synchronously rejected by the producer + */ +@SuppressWarnings("unchecked") +public SubmittedRecord submit(SourceRecord record) { +return submit((Map) record.sourcePartition(), (Map) record.sourceOffset()); +} + +// Convenience method for testing +SubmittedRecord submit(Map partition, Map offset) { +SubmittedRecord result = new SubmittedRecord(partition, offset); +records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) +.add(result); +return result; +} + +/** + * Remove a source record and do not take it into account any longer when tracking offsets. + * Useful if the record has been synchronously rejected by the producer. + * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null + */ +public void remove(SubmittedRecord record) { +Deque deque = records.get(record.partition()); +if (deque == null) { +log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition()); Review comment: IIUC this is really an unexpected condition, since it's single-threaded and this method is called only in the catch block after sending a record to the producer. But without that context, anyone reading this message in the log file might be confused or even concerned about what "remove record for partition..." means. WDYT about mentioning more of that context, something like: ```suggestion log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition()); ``` ## File path: connect/runtime/src/main/
[GitHub] [kafka] cmccabe merged pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
cmccabe merged pull request #11311: URL: https://github.com/apache/kafka/pull/11311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425673#comment-17425673 ] Hao Li commented on KAFKA-13164: Hi Ralph, Could you double check if map value join is working given my example PR? Any gaps between our examples? Thanks, Hao > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions
TheKnowles commented on pull request #11382: URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412 Unrelated tests locally and in jenkins appear flaky. All tests related to this change pass deterministically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types
C0urante commented on a change in pull request #11384: URL: https://github.com/apache/kafka/pull/11384#discussion_r724182155 ## File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java ## @@ -64,8 +64,14 @@ public static Schema schema(int scale) { * @return the encoded value */ public static byte[] fromLogical(Schema schema, BigDecimal value) { -if (value.scale() != scale(schema)) -throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); +int schemaScale = scale(schema); +if (value.scale() != schemaScale) +throw new DataException(String.format( +"BigDecimal has mismatching scale value for given Decimal schema. " Review comment: I tried to keep the same log message as before but just append new information to it. Agree that `BigDecimal` is a detail that people reading only logs (as opposed to logs in conjunction with source code) won't benefit from; can adjust accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13359) Round Robin Kafka Producer Routes to only half the partitions when even number of partitions
[ https://issues.apache.org/jira/browse/KAFKA-13359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425463#comment-17425463 ] Luke Chen commented on KAFKA-13359: --- The root cause of this issue should be KAFKA-9965. > Round Robin Kafka Producer Routes to only half the partitions when even > number of partitions > > > Key: KAFKA-13359 > URL: https://issues.apache.org/jira/browse/KAFKA-13359 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.7.0, 3.0.0 >Reporter: David G >Priority: Minor > > When you have 1 message per batch, in the round robin Partitioner. The > messages go only to half the partitions beacuse it always skips 1 partition. > This works out for odd number of partitions because skipping 1 will mean all > partitions get a hit eventually, but with an even number half the partitions > never get selected. > > So if you have partitions 1, 2, 3, 4 > Message 1: Partion 1 > Message 2: Partion 3 > Message 3: Partion 1 ... so on. [ Here 2 and 4 are never selected] > > If you have partitions 1, 2, 3 > Message 1: Partion 1 > Message 2: Partion 3 > Message 3: Partion 2 .. so on > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13359) Round Robin Kafka Producer Routes to only half the partitions when even number of partitions
David G created KAFKA-13359: --- Summary: Round Robin Kafka Producer Routes to only half the partitions when even number of partitions Key: KAFKA-13359 URL: https://issues.apache.org/jira/browse/KAFKA-13359 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 3.0.0, 2.7.0 Reporter: David G When you have 1 message per batch, in the round robin Partitioner. The messages go only to half the partitions beacuse it always skips 1 partition. This works out for odd number of partitions because skipping 1 will mean all partitions get a hit eventually, but with an even number half the partitions never get selected. So if you have partitions 1, 2, 3, 4 Message 1: Partion 1 Message 2: Partion 3 Message 3: Partion 1 ... so on. [ Here 2 and 4 are never selected] If you have partitions 1, 2, 3 Message 1: Partion 1 Message 2: Partion 3 Message 3: Partion 2 .. so on -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
tombentley commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r723982484 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -482,6 +486,22 @@ private synchronized void recordSent(final ProducerRecord record } } +private synchronized void recordSendFailed(ProducerRecord record) { +if (outstandingMessages.containsKey(record)) { +currentBatchFailed = true; +if (flushing) { +// flush thread may be waiting on the outstanding messages to clear +this.notifyAll(); +} +} else if (outstandingMessagesBacklog.containsKey(record)) { +backlogBatchFailed = true; +} +} + +public boolean shouldCommitOffsets() { Review comment: Can we add some Javadoc ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java ## @@ -214,6 +215,12 @@ public void testCancelAfterAwaitFlush() throws Exception { PowerMock.verifyAll(); } +private void assertBeginFlush(boolean shouldFlush) { +Consumer assertion = shouldFlush ? Assert::assertTrue : Assert::assertFalse; +assertion.accept(writer.willFlush()); +assertion.accept(writer.beginFlush()); Review comment: Wouldn't `assertEquals(shouldFlush, writer.willFlush())` and `assertEquals(shouldFlush, writer.beginFlush())` be clearer? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { +log.trace("{} Skipping offset commit as there is nothing to be committed", workerTask); Review comment: The method used in the `if` is called `shouldCommitOffsets`, not `areThereOffsetsToBeCommitted`, so is this message accurate, or perhaps the method name is slightly misleading? Maybe amending the message to "...there are no offsets that should be committed"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types
tombentley commented on a change in pull request #11384: URL: https://github.com/apache/kafka/pull/11384#discussion_r723930788 ## File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java ## @@ -64,8 +64,14 @@ public static Schema schema(int scale) { * @return the encoded value */ public static byte[] fromLogical(Schema schema, BigDecimal value) { -if (value.scale() != scale(schema)) -throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); +int schemaScale = scale(schema); +if (value.scale() != schemaScale) +throw new DataException(String.format( +"BigDecimal has mismatching scale value for given Decimal schema. " Review comment: Do we really want to mention `BigDecimal` here (it's really an implementation detail)? All the user needs to know is that the scale present in the value differs from the scale specified in the schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org