[jira] [Created] (KAFKA-13360) Wrong SSL messages when handshake fails

2021-10-07 Thread Rodolfo Kohn (Jira)
Rodolfo Kohn created KAFKA-13360:


 Summary: Wrong SSL messages when handshake fails
 Key: KAFKA-13360
 URL: https://issues.apache.org/jira/browse/KAFKA-13360
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.8.0
 Environment: Two VMs, one running one Kafka broker and the other one 
running kafka-console-consumer.sh.
The consumer is validating the server certificate.
Both VMs are VirtualBox running in the same laptop. 
Using internal LAN.
Latency is in the order of microseconds.
More details in attached PDF.

Reporter: Rodolfo Kohn
 Attachments: Kafka error.pdf, 
dump_192.168.56.101_192.168.56.102_32776_9093_2021_10_06_21_09_19.pcap, 
ssl_kafka_error_logs_match_ssl_logs.txt, 
ssl_kafka_error_logs_match_ssl_logs2.txt

When a consumer tries to connect to a Kafka broker and there is an error in the 
SSL handshake, like the server sending a certificate that cannot be validated 
for not matching the common name with the server/domain name, Kafka sends out 
erroneous SSL messages before sending an SSL alert. This error occurs in client 
but also can be seen in server.
Because of the nature of the problem it seems it will happen in more if not all 
handshake errors.
I've debugged and analyzed the Kafka networking code in 
org.apache.kafka.common.network and wrote a detailed description of how the 
error occurs.

Attaching the pcap file and a pdf with the detailed description of where the 
error is in the networking code (SslTransportLayer, Channel, Selector).

I executed a very basic test between kafka-console-consumer and a simple 
installation of one Kafka broker with TLS.
The test consisted on a Kafka broker with a certificate that didn’t match the 
domain name I used to identify the server. The CA was well set up to avoid 
related problems, like unknown CA error code. Thus, when the server sends the 
certificate to the client, the handshake fails with code error 46 (certificate 
unknown). The goal was that my tool would detect the issue and send an event, 
describing a TLS handshake problem for both processes. However, I noticed the 
tool sent what I thought it was the wrong event, it sent a TLS exception event 
for an unexpected message instead of an event for TLS alert for certificate 
unknown.

I noticed that during handshake, after the client receives Sever Hello, 
Certificate, Server Key Exchange, and Server Hello Done, it sends out the same 
Client Hello it sent at the beginning and then 3 more records with all zeroes, 
in two more messages. It sent a total of 16,709 Bytes including the 289 Bytes 
of Client Hello record.

 

This looks also like a design error regarding how protocol failures are handled.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #11372: [DO NOT MERGE] MINOR: Do not send data on abortable error too

2021-10-07 Thread GitBox


guozhangwang commented on pull request #11372:
URL: https://github.com/apache/kafka/pull/11372#issuecomment-938240965


   @hachikuji yeah I think you're right that 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L508
 have this case covered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

2021-10-07 Thread GitBox


guozhangwang commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r724594540



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -971,6 +972,9 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 streamsUncaughtExceptionHandler
 );
 streamThread.setStateListener(streamStateListener);
+for (final Map.Entry> exceptionHandler : 
topologyExceptionHandlers.entrySet()) {

Review comment:
   Should we synchronize on the `changeThreadCount` object as well?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/NamedTopologyException.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+public class NamedTopologyException extends KafkaException {
+
+final String topologyName;
+
+public NamedTopologyException(final String topologyName, final Throwable 
throwable) {

Review comment:
   Could we consider overriding the `toString` function to include the 
`topologyName`?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -513,8 +514,8 @@ private boolean wrappedExceptionIsIn(final Throwable 
throwable, final Set

[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-10-07 Thread GitBox


C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-937378866


   @hachikuji could you take a look at this when you have a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore

2021-10-07 Thread GitBox


mjsax commented on pull request #11370:
URL: https://github.com/apache/kafka/pull/11370#issuecomment-937373122


   Thanks for the PR @showuon! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

2021-10-07 Thread GitBox


rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source 
partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been 
either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link 
#remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+// Visible for testing
+final Map, Deque> records;
+
+public SubmittedRecords() {
+this.records = new HashMap<>();
+}
+
+/**
+ * Enqueue a new source record before dispatching it to a producer.
+ * The returned {@link SubmittedRecord} should either be {@link 
SubmittedRecord#ack() acknowledged} in the
+ * producer callback, or {@link #remove(SubmittedRecord) removed} if the 
record could not be successfully
+ * sent to the producer.
+ * 
+ * @param record the record about to be dispatched; may not be null but 
may have a null
+ *   {@link SourceRecord#sourcePartition()} and/or {@link 
SourceRecord#sourceOffset()}
+ * @return a {@link SubmittedRecord} that can be either {@link 
SubmittedRecord#ack() acknowledged} once ack'd by
+ * the producer, or {@link #remove removed} if synchronously 
rejected by the producer
+ */
+@SuppressWarnings("unchecked")
+public SubmittedRecord submit(SourceRecord record) {
+return submit((Map) record.sourcePartition(), 
(Map) record.sourceOffset());
+}
+
+// Convenience method for testing
+SubmittedRecord submit(Map partition, Map 
offset) {
+SubmittedRecord result = new SubmittedRecord(partition, offset);
+records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+.add(result);
+return result;
+}
+
+/**
+ * Remove a source record and do not take it into account any longer when 
tracking offsets.
+ * Useful if the record has been synchronously rejected by the producer.
+ * @param record the {@link #submit previously-submitted} record to stop 
tracking; may not be null
+ */
+public void remove(SubmittedRecord record) {
+Deque deque = records.get(record.partition());
+if (deque == null) {
+log.warn("Attempted to remove record for partition {}, but no 
records with that partition are present", record.partition());

Review comment:
   IIUC this is really an unexpected condition, since it's single-threaded 
and this method is called only in the catch block after sending a record to the 
producer. But without that context, anyone reading this message in the log file 
might be confused or even concerned about what "remove record for partition..." 
means. WDYT about mentioning more of that context, something like:
   ```suggestion
   log.warn("Attempted to remove record from submitted queue for 
partition {}, but no records with that partition appear to have been 
submitted", record.partition());
   ```

##
File path: 
connect/runtime/src/main/

[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2021-10-07 Thread GitBox


TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412


   Unrelated tests locally and in jenkins appear flaky. All tests related to 
this change pass deterministically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore

2021-10-07 Thread GitBox


mjsax merged pull request #11370:
URL: https://github.com/apache/kafka/pull/11370


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hojongs commented on a change in pull request #11249: Fix wrong link to "Guarantees" part in introduction.html documentation

2021-10-07 Thread GitBox


hojongs commented on a change in pull request #11249:
URL: https://github.com/apache/kafka/pull/11249#discussion_r723837977



##
File path: docs/introduction.html
##
@@ -144,13 +144,13 @@ 
 
   
   
-Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.
+Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.

Review comment:
   You're right. That's more make sense.
   

##
File path: docs/introduction.html
##
@@ -144,13 +144,13 @@ 
 
   
   
-Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.
+Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.

Review comment:
   You're right. That's more make sense.
   
   Applied at f017014 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

2021-10-07 Thread GitBox


cmccabe merged pull request #11311:
URL: https://github.com/apache/kafka/pull/11311


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-07 Thread GitBox


tombentley commented on a change in pull request #11384:
URL: https://github.com/apache/kafka/pull/11384#discussion_r723930788



##
File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
##
@@ -64,8 +64,14 @@ public static Schema schema(int scale) {
  * @return the encoded value
  */
 public static byte[] fromLogical(Schema schema, BigDecimal value) {
-if (value.scale() != scale(schema))
-throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
+int schemaScale = scale(schema);
+if (value.scale() != schemaScale)
+throw new DataException(String.format(
+"BigDecimal has mismatching scale value for given Decimal 
schema. "

Review comment:
   Do we really want to mention `BigDecimal` here (it's really an 
implementation detail)? All the user needs to know is that the scale present in 
the value differs from the scale specified in the schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-07 Thread GitBox


C0urante commented on pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#issuecomment-937394815


   @rhauch Is that second pass coming any time soon? It'd be nice if we could 
get this merged in time for the upcoming 3.1 release.
   
   I plan to address the existing comments next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'

2021-10-07 Thread GitBox


ableegoldman merged pull request #11332:
URL: https://github.com/apache/kafka/pull/11332


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'

2021-10-07 Thread GitBox


ableegoldman commented on pull request #11332:
URL: https://github.com/apache/kafka/pull/11332#issuecomment-937381455






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

2021-10-07 Thread GitBox


C0urante commented on pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#issuecomment-937395625


   @kkonstantine Would you mind giving this another pass? It's been over two 
months.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-07 Thread GitBox


C0urante commented on a change in pull request #11384:
URL: https://github.com/apache/kafka/pull/11384#discussion_r724182155



##
File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
##
@@ -64,8 +64,14 @@ public static Schema schema(int scale) {
  * @return the encoded value
  */
 public static byte[] fromLogical(Schema schema, BigDecimal value) {
-if (value.scale() != scale(schema))
-throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
+int schemaScale = scale(schema);
+if (value.scale() != schemaScale)
+throw new DataException(String.format(
+"BigDecimal has mismatching scale value for given Decimal 
schema. "

Review comment:
   I tried to keep the same log message as before but just append new 
information to it. Agree that `BigDecimal` is a detail that people reading only 
logs (as opposed to logs in conjunction with source code) won't benefit from; 
can adjust accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-10-07 Thread GitBox


C0urante commented on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-937395176


   @rhauch @tombentley could either of you take a look? It'd be nice to get 
this merged in time for the upcoming 3.1 release; I know I've seen plenty of 
people led astray by continued offset commit messages for failed tasks and it'd 
be great if we could improve their experience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-10-07 Thread GitBox


tombentley commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r723982484



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord record
 }
 }
 
+private synchronized void recordSendFailed(ProducerRecord 
record) {
+if (outstandingMessages.containsKey(record)) {
+currentBatchFailed = true;
+if (flushing) {
+// flush thread may be waiting on the outstanding messages to 
clear
+this.notifyAll();
+}
+} else if (outstandingMessagesBacklog.containsKey(record)) {
+backlogBatchFailed = true;
+}
+}
+
+public boolean shouldCommitOffsets() {

Review comment:
   Can we add some Javadoc

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
##
@@ -214,6 +215,12 @@ public void testCancelAfterAwaitFlush() throws Exception {
 PowerMock.verifyAll();
 }
 
+private void assertBeginFlush(boolean shouldFlush) {
+Consumer assertion = shouldFlush ? Assert::assertTrue : 
Assert::assertFalse;
+assertion.accept(writer.willFlush());
+assertion.accept(writer.beginFlush());

Review comment:
   Wouldn't `assertEquals(shouldFlush, writer.willFlush())` and 
`assertEquals(shouldFlush, writer.beginFlush())` be clearer?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {
+log.trace("{} Skipping offset commit as there is nothing to be 
committed", workerTask);

Review comment:
   The method used in the `if` is called `shouldCommitOffsets`, not 
`areThereOffsetsToBeCommitted`, so is this message accurate, or perhaps the 
method name is slightly misleading? Maybe amending the message to "...there are 
no offsets that should be committed"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

2021-10-07 Thread GitBox


rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source 
partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been 
either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link 
#remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+// Visible for testing
+final Map, Deque> records;
+
+public SubmittedRecords() {
+this.records = new HashMap<>();
+}
+
+/**
+ * Enqueue a new source record before dispatching it to a producer.
+ * The returned {@link SubmittedRecord} should either be {@link 
SubmittedRecord#ack() acknowledged} in the
+ * producer callback, or {@link #remove(SubmittedRecord) removed} if the 
record could not be successfully
+ * sent to the producer.
+ * 
+ * @param record the record about to be dispatched; may not be null but 
may have a null
+ *   {@link SourceRecord#sourcePartition()} and/or {@link 
SourceRecord#sourceOffset()}
+ * @return a {@link SubmittedRecord} that can be either {@link 
SubmittedRecord#ack() acknowledged} once ack'd by
+ * the producer, or {@link #remove removed} if synchronously 
rejected by the producer
+ */
+@SuppressWarnings("unchecked")
+public SubmittedRecord submit(SourceRecord record) {
+return submit((Map) record.sourcePartition(), 
(Map) record.sourceOffset());
+}
+
+// Convenience method for testing
+SubmittedRecord submit(Map partition, Map 
offset) {
+SubmittedRecord result = new SubmittedRecord(partition, offset);
+records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+.add(result);
+return result;
+}
+
+/**
+ * Remove a source record and do not take it into account any longer when 
tracking offsets.
+ * Useful if the record has been synchronously rejected by the producer.
+ * @param record the {@link #submit previously-submitted} record to stop 
tracking; may not be null
+ */
+public void remove(SubmittedRecord record) {
+Deque deque = records.get(record.partition());
+if (deque == null) {
+log.warn("Attempted to remove record for partition {}, but no 
records with that partition are present", record.partition());

Review comment:
   IIUC this is really an unexpected condition, since it's single-threaded 
and this method is called only in the catch block after sending a record to the 
producer. But without that context, anyone reading this message in the log file 
might be confused or even concerned about what "remove record for partition..." 
means. WDYT about mentioning more of that context, something like:
   ```suggestion
   log.warn("Attempted to remove record from submitted queue for 
partition {}, but no records with that partition appear to have been 
submitted", record.partition());
   ```

##
File path: 
connect/runtime/src/main/

[GitHub] [kafka] cmccabe merged pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

2021-10-07 Thread GitBox


cmccabe merged pull request #11311:
URL: https://github.com/apache/kafka/pull/11311


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-10-07 Thread Hao Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425673#comment-17425673
 ] 

Hao Li commented on KAFKA-13164:


Hi Ralph,

 

Could you double check if map value join is working given my example PR? Any 
gaps between our examples?

 

Thanks,

Hao

> State store is attached to wrong node in the Kafka Streams topology
> ---
>
> Key: KAFKA-13164
> URL: https://issues.apache.org/jira/browse/KAFKA-13164
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: local development (MacOS Big Sur 11.4)
>Reporter: Ralph Matthias Debusmann
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.0.1
>
> Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table().mapValues( function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, );
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream().mapValues( function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2021-10-07 Thread GitBox


TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412


   Unrelated tests locally and in jenkins appear flaky. All tests related to 
this change pass deterministically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-07 Thread GitBox


C0urante commented on a change in pull request #11384:
URL: https://github.com/apache/kafka/pull/11384#discussion_r724182155



##
File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
##
@@ -64,8 +64,14 @@ public static Schema schema(int scale) {
  * @return the encoded value
  */
 public static byte[] fromLogical(Schema schema, BigDecimal value) {
-if (value.scale() != scale(schema))
-throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
+int schemaScale = scale(schema);
+if (value.scale() != schemaScale)
+throw new DataException(String.format(
+"BigDecimal has mismatching scale value for given Decimal 
schema. "

Review comment:
   I tried to keep the same log message as before but just append new 
information to it. Agree that `BigDecimal` is a detail that people reading only 
logs (as opposed to logs in conjunction with source code) won't benefit from; 
can adjust accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13359) Round Robin Kafka Producer Routes to only half the partitions when even number of partitions

2021-10-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425463#comment-17425463
 ] 

Luke Chen commented on KAFKA-13359:
---

The root cause of this issue should be KAFKA-9965.

> Round Robin Kafka Producer Routes to only half the partitions when even 
> number of partitions
> 
>
> Key: KAFKA-13359
> URL: https://issues.apache.org/jira/browse/KAFKA-13359
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.7.0, 3.0.0
>Reporter: David G
>Priority: Minor
>
> When you have 1 message per batch, in the round robin Partitioner. The 
> messages go only to half the partitions beacuse it always skips 1 partition. 
> This works out for odd number of partitions because skipping 1 will mean all 
> partitions get a hit eventually, but with an even number half the partitions 
> never get selected.
>  
> So if you have partitions 1, 2, 3, 4 
> Message 1: Partion 1
> Message 2: Partion 3
> Message 3: Partion 1 ... so on. [ Here 2 and 4 are never selected]
>  
> If you have partitions 1, 2, 3
> Message 1: Partion 1
> Message 2: Partion 3
> Message 3: Partion 2 .. so on
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13359) Round Robin Kafka Producer Routes to only half the partitions when even number of partitions

2021-10-07 Thread David G (Jira)
David G created KAFKA-13359:
---

 Summary: Round Robin Kafka Producer Routes to only half the 
partitions when even number of partitions
 Key: KAFKA-13359
 URL: https://issues.apache.org/jira/browse/KAFKA-13359
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.0.0, 2.7.0
Reporter: David G


When you have 1 message per batch, in the round robin Partitioner. The messages 
go only to half the partitions beacuse it always skips 1 partition. This works 
out for odd number of partitions because skipping 1 will mean all partitions 
get a hit eventually, but with an even number half the partitions never get 
selected.

 

So if you have partitions 1, 2, 3, 4 

Message 1: Partion 1

Message 2: Partion 3

Message 3: Partion 1 ... so on. [ Here 2 and 4 are never selected]

 

If you have partitions 1, 2, 3

Message 1: Partion 1

Message 2: Partion 3

Message 3: Partion 2 .. so on

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-10-07 Thread GitBox


tombentley commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r723982484



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord record
 }
 }
 
+private synchronized void recordSendFailed(ProducerRecord 
record) {
+if (outstandingMessages.containsKey(record)) {
+currentBatchFailed = true;
+if (flushing) {
+// flush thread may be waiting on the outstanding messages to 
clear
+this.notifyAll();
+}
+} else if (outstandingMessagesBacklog.containsKey(record)) {
+backlogBatchFailed = true;
+}
+}
+
+public boolean shouldCommitOffsets() {

Review comment:
   Can we add some Javadoc

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
##
@@ -214,6 +215,12 @@ public void testCancelAfterAwaitFlush() throws Exception {
 PowerMock.verifyAll();
 }
 
+private void assertBeginFlush(boolean shouldFlush) {
+Consumer assertion = shouldFlush ? Assert::assertTrue : 
Assert::assertFalse;
+assertion.accept(writer.willFlush());
+assertion.accept(writer.beginFlush());

Review comment:
   Wouldn't `assertEquals(shouldFlush, writer.willFlush())` and 
`assertEquals(shouldFlush, writer.beginFlush())` be clearer?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {
+log.trace("{} Skipping offset commit as there is nothing to be 
committed", workerTask);

Review comment:
   The method used in the `if` is called `shouldCommitOffsets`, not 
`areThereOffsetsToBeCommitted`, so is this message accurate, or perhaps the 
method name is slightly misleading? Maybe amending the message to "...there are 
no offsets that should be committed"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-07 Thread GitBox


tombentley commented on a change in pull request #11384:
URL: https://github.com/apache/kafka/pull/11384#discussion_r723930788



##
File path: connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
##
@@ -64,8 +64,14 @@ public static Schema schema(int scale) {
  * @return the encoded value
  */
 public static byte[] fromLogical(Schema schema, BigDecimal value) {
-if (value.scale() != scale(schema))
-throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
+int schemaScale = scale(schema);
+if (value.scale() != schemaScale)
+throw new DataException(String.format(
+"BigDecimal has mismatching scale value for given Decimal 
schema. "

Review comment:
   Do we really want to mention `BigDecimal` here (it's really an 
implementation detail)? All the user needs to know is that the scale present in 
the value differs from the scale specified in the schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org