[GitHub] [kafka] cmccabe commented on pull request #11385: Translate null client IDs to the empty string

2021-10-08 Thread GitBox


cmccabe commented on pull request #11385:
URL: https://github.com/apache/kafka/pull/11385#issuecomment-939210630


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11386: MINOR: Fix highest offset when loading KRaft metadata snapshots

2021-10-08 Thread GitBox


jsancio opened a new pull request #11386:
URL: https://github.com/apache/kafka/pull/11386


   There are a few fixes included in this commit.
   1. When loading a snapshot the broker `BrokerMetadataListener` was using the 
batch's append time, offset and epoch. These are not the same as the append 
time, offset and epoch from the log. We must instead use the 
`lastContainedLogTimeStamp`, `lastContainedLogOffset` and 
`lastContainedLogEpoch` from the `SnapshotReader`.
   2. Include the highest offset and epoch into the `MetadataImage` and 
`MetadataDelta`. Adding the offset and epoch to `MetadataImage` is useful to 
version the image and to simplify the API. Adding the offset and epoch to 
`MetadataDelta` is needed to generate the `MetadataImage`.
   3. Swapped the order of the arguments for `ReplicaManager.applyDelta` for 
consistency to match the order of the arguments for `MetadataPublisher.publish`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker

2021-10-08 Thread GitBox


junrao commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r72535



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -82,44 +87,148 @@
 // User topic partitions that this broker is a leader/follower for.
 private Set assignedTopicPartitions = 
Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets.
+// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
+// may or may not have been processed based on the assigned topic 
partitions.
 private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+// Map of remote log metadata topic partition to processed offsets. 
Received consumer record is
+// processed as the remote log metadata record's topic partition exists in 
assigned topic partitions.
+private final Map partitionToProcessedOffsets = new 
ConcurrentHashMap<>();
+
+// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
+private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+private final long committedOffsetSyncIntervalMs;
+private CommittedOffsetsFile committedOffsetsFile;
+private long lastSyncedTimeMs;
+
 public ConsumerTask(KafkaConsumer consumer,
 RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-RemoteLogMetadataTopicPartitioner topicPartitioner) {
-Objects.requireNonNull(consumer);
-Objects.requireNonNull(remotePartitionMetadataEventHandler);
-Objects.requireNonNull(topicPartitioner);
-
-this.consumer = consumer;
-this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-this.topicPartitioner = topicPartitioner;
+RemoteLogMetadataTopicPartitioner topicPartitioner,
+Path committedOffsetsPath,
+Time time,
+long committedOffsetSyncIntervalMs) {
+this.consumer = Objects.requireNonNull(consumer);
+this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+this.time = Objects.requireNonNull(time);
+this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+initializeConsumerAssignment(committedOffsetsPath);
+}
+
+private void initializeConsumerAssignment(Path committedOffsetsPath) {
+try {
+committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+
+Map committedOffsets = Collections.emptyMap();
+try {
+// Load committed offset and assign them in the consumer.
+committedOffsets = committedOffsetsFile.readEntries();
+} catch (IOException e) {
+// Ignore the error and consumer consumes from the earliest offset.
+log.error("Encountered error while building committed offsets from 
the file", e);
+}
+
+if (!committedOffsets.isEmpty()) {
+// Assign topic partitions from the earlier committed offsets file.
+Set earlierAssignedPartitions = committedOffsets.keySet();
+assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+Set metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+   
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+   
.collect(Collectors.toSet());
+consumer.assign(metadataTopicPartitions);
+
+// Seek to the committed offsets
+for (Map.Entry entry : committedOffsets.entrySet()) 
{
+partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
+partitionToProcessedOffsets.put(entry.getKey(), 
entry.getValue());
+consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
+}
+
+lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
+}
 }
 
 @Override
 public void run() {
 log.info("Started Consumer task thread.");
+lastSyncedTimeMs = time.milliseconds();
 try {
 while (!closing) {
 maybeWaitForPartitionsAssignment();
 
 log.info("Polling consumer to receive remote log metadata 
topic records");
-ConsumerRecords consumerRecords
-= 

[jira] [Commented] (KAFKA-13360) Wrong SSL messages when handshake fails

2021-10-08 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426334#comment-17426334
 ] 

David Mao commented on KAFKA-13360:
---

Very thorough writeup, nice find!

> Wrong SSL messages when handshake fails
> ---
>
> Key: KAFKA-13360
> URL: https://issues.apache.org/jira/browse/KAFKA-13360
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
> Environment: Two VMs, one running one Kafka broker and the other one 
> running kafka-console-consumer.sh.
> The consumer is validating the server certificate.
> Both VMs are VirtualBox running in the same laptop. 
> Using internal LAN.
> Latency is in the order of microseconds.
> More details in attached PDF.
>Reporter: Rodolfo Kohn
>Priority: Major
> Attachments: Kafka error.pdf, 
> dump_192.168.56.101_192.168.56.102_32776_9093_2021_10_06_21_09_19.pcap, 
> ssl_kafka_error_logs_match_ssl_logs.txt, 
> ssl_kafka_error_logs_match_ssl_logs2.txt
>
>
> When a consumer tries to connect to a Kafka broker and there is an error in 
> the SSL handshake, like the server sending a certificate that cannot be 
> validated for not matching the common name with the server/domain name, Kafka 
> sends out erroneous SSL messages before sending an SSL alert. This error 
> occurs in client but also can be seen in server.
> Because of the nature of the problem it seems it will happen in more if not 
> all handshake errors.
> I've debugged and analyzed the Kafka networking code in 
> org.apache.kafka.common.network and wrote a detailed description of how the 
> error occurs.
> Attaching the pcap file and a pdf with the detailed description of where the 
> error is in the networking code (SslTransportLayer, Channel, Selector).
> I executed a very basic test between kafka-console-consumer and a simple 
> installation of one Kafka broker with TLS.
> The test consisted on a Kafka broker with a certificate that didn’t match the 
> domain name I used to identify the server. The CA was well set up to avoid 
> related problems, like unknown CA error code. Thus, when the server sends the 
> certificate to the client, the handshake fails with code error 46 
> (certificate unknown). The goal was that my tool would detect the issue and 
> send an event, describing a TLS handshake problem for both processes. 
> However, I noticed the tool sent what I thought it was the wrong event, it 
> sent a TLS exception event for an unexpected message instead of an event for 
> TLS alert for certificate unknown.
> I noticed that during handshake, after the client receives Sever Hello, 
> Certificate, Server Key Exchange, and Server Hello Done, it sends out the 
> same Client Hello it sent at the beginning and then 3 more records with all 
> zeroes, in two more messages. It sent a total of 16,709 Bytes including the 
> 289 Bytes of Client Hello record.
>  
> This looks also like a design error regarding how protocol failures are 
> handled.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-10-08 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426321#comment-17426321
 ] 

Sagar Rao commented on KAFKA-13295:
---

hey [~ableegoldman], whenever you get the chance, plz help me with the couple 
of questions from the comment above. 

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos
> Fix For: 3.1.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-08 Thread GitBox


vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-939038122


   Thanks @showuon , made the suggested changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-08 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r725222143



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedKeyValueIterator.java
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class WindowedKeyValueIterator extends 
AbstractIterator, byte[]>> implements 
KeyValueIterator, byte[]> {

Review comment:
   removed

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredPersistentWindowedStoreTestIterator.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class MeteredPersistentWindowedStoreTestIterator implements 
KeyValueIterator {

Review comment:
   removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-08 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r725221935



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
##
@@ -60,7 +103,8 @@ public void shouldRemoveExpired() {
 try (final KeyValueIterator, Long> iterator =
 sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
 ) {
-assertEquals(valuesToSet(iterator), new 
HashSet<>(Arrays.asList(2L, 3L, 4L)));
+assertEquals(valuesToSet(iterator), new 
HashSet<>(Collections.singletonList(4L)));

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12957) Refactor Streams Logical Plan Generation

2021-10-08 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426311#comment-17426311
 ] 

Sagar Rao commented on KAFKA-12957:
---

[~ableegoldman], sure. Assigned that one to myself. 

> Refactor Streams Logical Plan Generation
> 
>
> Key: KAFKA-12957
> URL: https://issues.apache.org/jira/browse/KAFKA-12957
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>
> There is a general issue of Streams logical plan -> physical plan generation, 
> where the physical processor nodes are generated at the parsing phase rather 
> than the logical plan compilation phase. The former stage is agnostic to any 
> user configurations while only the latter stage have access to it, and hence 
> we would not generate physical processor nodes during the parsing phase (i.e. 
> any code related to StreamsBuilder), but defer them to the logical plan phase 
> (i.e. XXNode.writeToTopology). This has several issues such that many 
> physical processor instantiation requires to access the configs, and hence we 
> have to defer it to the `init` procedure of the node, which is scattered in 
> many places from logical nodes to physical processors.
> This would be a big refactoring on Stream's logical plan generation, but I 
> think it would worth to get this in a cleaner state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13336) Migrate StreamsBuilder/Topology class to interfaces and move Topology parameter from KafkaStreams constructor to #start

2021-10-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13336:
-

Assignee: Sagar Rao

> Migrate StreamsBuilder/Topology class to interfaces and move Topology 
> parameter from KafkaStreams constructor to #start
> ---
>
> Key: KAFKA-13336
> URL: https://issues.apache.org/jira/browse/KAFKA-13336
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> In order to refactor and improve the streams physical plan generation, we'll 
> need to clean up the DSL builder API a bit and in particular enforce the 
> configs be passed in from the beginning, rather than only when calling 
> #build. We can also use this opportunity to improve the disconnect between 
> the builder, the resulting Topology, and the Kafka Streams application that 
> ultimately runs this topology – at the moment these are all completely 
> uncoupled on the surface, so it's easy to think that a StreamsBuilder can be 
> reused to build multiple Topology objects, or that a Topology object could be 
> passed in to different KafkaStreams. However there is internal state that is 
> shared and modified during StreamsBuilder#build and in the KafkaStreams 
> constructor, and they are actually very coupled under the hood meaning there 
> must be a 1:1:1 ratio of builder to topology to KafkaStreams. So we need a 
> new API that
>  # Forces users to pass in the configs (Properties) when constructing the 
> builder
>  # Clarifies the relationship of the builder object to the topology, and to 
> the app itself
> I think a good API for this might look something like this:
>  # Move the StreamsBuilder class to an internal one (technically we would 
> need to keep it where it is for now until a full deprecation cycle)
>  # Introduce a TopologyBuilder interface to replace the functionality of the 
> current StreamsBuilder class, and have StreamsBuilder implement this. All the 
> current methods on StreamsBuilder will be moved to the TopologyBuilder 
> interfaces
>  # Move the Topology parameter out of the KafkaStreams constructor, and into 
> the KafkaStreams#start method, so you can construct a KafkaStreams object 
> before the Topology
>  # Add a factory method on KafkaStreams for users to get instances of the 
> TopologyBuilder, and have this accept a Properties. For example
> {code:java}
> class KafkaStreams {
> public void newTopologyBuilder(final Properties props) {
> // convert to StreamsConfig to validate configs & check for 
> application.id
> final StreamsConfig config = new StreamsConfig(props); 
> return new StreamsBuilder(config);
> }
> }{code}
> This should satisfy both of the requirements, and imo provides a cleaner API 
> anyways. Getting the builder through a factory method on the KafkaStreams 
> object should make it clear that this builder is tied to that particular 
> KafkaStreams instance. And we can enforce that it isn't reused for a 
> different application by parsing the Properties passed in to 
> KafkaStreams#newTopologyBuilder, specifically the application.id. It also 
> leads to a more natural process of writing a Kafka Streams app: start with 
> the KafkaStreams object and global configs, then use this to build up the 
> processing topology. Looking forward, this will better complement the new 
> named topologies feature, with an API that treats topologies as entities 
> attached to a particular KafkaStreams but that may come and go



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-10-08 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li closed KAFKA-13164.
--

> State store is attached to wrong node in the Kafka Streams topology
> ---
>
> Key: KAFKA-13164
> URL: https://issues.apache.org/jira/browse/KAFKA-13164
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: local development (MacOS Big Sur 11.4)
>Reporter: Ralph Matthias Debusmann
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.0.1
>
> Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table().mapValues( function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, );
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream().mapValues( function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-10-08 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li closed KAFKA-13164.
--

> State store is attached to wrong node in the Kafka Streams topology
> ---
>
> Key: KAFKA-13164
> URL: https://issues.apache.org/jira/browse/KAFKA-13164
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: local development (MacOS Big Sur 11.4)
>Reporter: Ralph Matthias Debusmann
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.0.1
>
> Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table().mapValues( function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, );
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream().mapValues( function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-10-08 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li resolved KAFKA-13164.

Resolution: Cannot Reproduce

Closing this as I can't reproduce the issue

> State store is attached to wrong node in the Kafka Streams topology
> ---
>
> Key: KAFKA-13164
> URL: https://issues.apache.org/jira/browse/KAFKA-13164
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: local development (MacOS Big Sur 11.4)
>Reporter: Ralph Matthias Debusmann
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.0.1
>
> Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table().mapValues( function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, );
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream().mapValues( function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-10-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13268.
---
Resolution: Duplicate

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-10-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426282#comment-17426282
 ] 

Guozhang Wang commented on KAFKA-13268:
---

Yup, I think KAFKA-13261 would be fully covering it. I'm going to close this 
one.

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13332) New pattern-matched topic with more partitions than existing matched topics can crash Kafka Streams

2021-10-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426279#comment-17426279
 ] 

Guozhang Wang commented on KAFKA-13332:
---

Yeah I agree with that.

Atm Streams do not yet have a way to tell the difference during assignment 
between newly created topics and existing topics since they are all just coming 
from the consumer's metadata, but what we can do is to at least log how we come 
up with the max(num.partitions) of all source topics.

> New pattern-matched topic with more partitions than existing matched topics 
> can crash Kafka Streams
> ---
>
> Key: KAFKA-13332
> URL: https://issues.apache.org/jira/browse/KAFKA-13332
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> The partition count resolution logic in Streams is used to determine the 
> number of partitions for any repartition topics that don't already exist. 
> This is done by parsing the topology to find the number of partitions of all 
> upstream topics, and taking the max. For Pattern-subscribed subtopologies, 
> this means you need to ensure that at least one topic matching this pattern 
> is created prior to starting up the app. That topic, or topics, will 
> determine the number of partitions for any downstream repartitions.
> The problem is that repartition topics are created once, the first time the 
> app is started up. After that, during each rebalance Streams will validate 
> all repartition topics including checking for their existence, and verifying 
> they have the correct number of partitions. This check will fail if a new 
> topic is created after the first initialization, which matches the pattern 
> but has more partitions than any of the existing topics.
> This means that unfortunately, you can't create a new input topic that 
> matches the pattern your app is subscribed to unless it has equal or fewer 
> partitions than the existing matching topics. If you do, you would need to 
> stop all instances and delete the existing repartition topics before creating 
> this new topic



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

2021-10-08 Thread GitBox


guozhangwang commented on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-938839860


   @hachikuji @showuon could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11367: MINOR: Do not copy on range for in-memory shared store in stream stream left/out joins

2021-10-08 Thread GitBox


guozhangwang commented on a change in pull request #11367:
URL: https://github.com/apache/kafka/pull/11367#discussion_r725164191



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##
@@ -29,21 +29,33 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
+import java.util.NavigableSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 public class InMemoryKeyValueStore implements KeyValueStore {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryKeyValueStore.class);
 
 private final String name;
-private final NavigableMap map = new TreeMap<>();
+private final boolean copyOnRange;
+private final NavigableMap map;
+
 private volatile boolean open = false;
 private long size = 0L; // SkipListMap#size is O(N) so we just do our best 
to track it
 
+// for tests-only
 public InMemoryKeyValueStore(final String name) {
+this(name, true);
+}
+
+public InMemoryKeyValueStore(final String name, final boolean copyOnRange) 
{
 this.name = name;
+this.copyOnRange = copyOnRange;
+// if we do not need to copy on range, then we should use concurrent 
skiplist map
+// to avoid concurrent modificiation exception
+this.map = copyOnRange ? new TreeMap<>() : new 
ConcurrentSkipListMap<>();

Review comment:
   Yes I agree, but from the flamegraphs even with this PR the major 
bottleneck is still G1GC. I'm not going to merge this PR since the change seems 
not significantly improving the performance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

2021-10-08 Thread GitBox


guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r725149597



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -735,7 +735,6 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
 topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
 expectLastCall().anyTimes();
 expectRestoreToBeCompleted(consumer, changeLogReader);
-consumer.commitSync(eq(emptyMap()));

Review comment:
   SG! Will update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-10-08 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426245#comment-17426245
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hi [~xnix] , of course! It's no trouble at all. Could you please share an email 
address to include as a coauthor on the commit when the PR is merged? 
[https://docs.github.com/en/github/committing-changes-to-your-project/creating-and-editing-commits/creating-a-commit-with-multiple-authors#required-co-author-information]
 

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Fix For: 3.1.0
>
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #11220: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-10-08 Thread GitBox


mimaison commented on pull request #11220:
URL: https://github.com/apache/kafka/pull/11220#issuecomment-938744186


   @OmniaGM I sent an update to the VOTE thread of your KIP: 
https://lists.apache.org/thread.html/rbdcdfe49229f7e1d2684a1a5f17364004f8b7545cd65bc35d296e327%40%3Cdev.kafka.apache.org%3E
   
   If nobody complains, I'll merge the PR early next week.
   Thanks again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] OmniaGM commented on a change in pull request #11220: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-10-08 Thread GitBox


OmniaGM commented on a change in pull request #11220:
URL: https://github.com/apache/kafka/pull/11220#discussion_r725119834



##
File path: 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
##
@@ -52,9 +52,39 @@ default String originalTopic(String topic) {
 }
 }
 
+/** Returns heartbeats topic name.*/
+default String heartbeatsTopic() {
+return "heartbeats";
+}
+
+/** Returns the offset-syncs topic for given cluster alias. */
+default String offsetSyncTopic(String clusterAlias) {
+return "mm2-offset-syncs." + clusterAlias + ".internal";
+}
+
+/** Returns the name checkpoint topic for given cluster alias. */
+default String checkpointTopic(String clusterAlias) {
+return clusterAlias + ".checkpoint.internal";

Review comment:
   Thanks for sending the update to the VOTE thread 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11350: Scala3 migration

2021-10-08 Thread GitBox


jlprat commented on pull request #11350:
URL: https://github.com/apache/kafka/pull/11350#issuecomment-938614404


   Current status of the PR:
   - All tests pass in Scala 3 except one 
(`KafkaApis#getAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition`)
 due to this bug with no workaround: 
https://github.com/lampepfl/dotty/issues/13638
   - All tests pass in Scala 2
   - It still uses a snapshot build of Gradle
   
   In this PR there are several types of changes:
   a) Needed changes because of changes in syntax or compiler being currently 
more strict
   b) Changes where a workaround is present but they are fixed in upcoming 
Scala versions
   c) Changes in the build
   
   I propose perform the Scala 3 migration in several steps:
   - Firstly apply all changes that would fall into category *a)*
   - Once a new Scala version that incorporates the mentioned fixes is 
released, revisit point changes in point *b)*
   - Lastly, once https://github.com/lampepfl/dotty/issues/13638 is resolved or 
a workaround is found and Gradle with Scala 3 support is released we could 
tackle the final step and incorporate changes mentioned in point *c)*
   
   
   What are your thoughts? @ijuma 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11350: Scala3 migration

2021-10-08 Thread GitBox


jlprat commented on a change in pull request #11350:
URL: https://github.com/apache/kafka/pull/11350#discussion_r724963365



##
File path: core/src/main/java/kafka/server/builders/LogManagerBuilder.java
##
@@ -45,7 +46,7 @@
 private long flushStartOffsetCheckpointMs = 1L;
 private long retentionCheckMs = 1000L;
 private int maxPidExpirationMs = 6;
-private ApiVersion interBrokerProtocolVersion = ApiVersion.latestVersion();
+private ApiVersion interBrokerProtocolVersion = 
ApiVersion$.MODULE$.latestVersion();

Review comment:
   This has been solved in upcoming Scala versions, Bug fix here: 
https://github.com/lampepfl/dotty/issues/13572

##
File path: 
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##
@@ -239,10 +239,11 @@ class FinalizedFeatureChangeListenerTest extends 
ZooKeeperTestHarness {
 
 val exitLatch = new CountDownLatch(1)
 Exit.setExitProcedure((_, _) => exitLatch.countDown())
+val feature1: SupportedVersionRange = 
brokerFeatures.supportedFeatures.get("feature_1")

Review comment:
   This change is needed because Scala 3 seems to prioritize the highest 
type in the hierarchy when inferring and the one inferred had package protected 
visibility. Setting the right type manually solves the problem.

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -1036,8 +1036,8 @@ object ConsumerGroupCommand extends Logging {
 val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
  .withRequiredArg
  .describedAs("timeout (ms)")
- .ofType(classOf[Long])
- .defaultsTo(5000)
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(5000L)

Review comment:
   This type of changes are due to this bug: 
https://github.com/lampepfl/dotty/issues/13630

##
File path: gradle/spotbugs-exclude.xml
##
@@ -289,6 +297,63 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

Review comment:
   This seem to me like a false positive when compiling in Scala 3
   

##
File path: gradle/spotbugs-exclude.xml
##
@@ -289,6 +297,63 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+

Review comment:
   These 2 changes seem to me like a false positive when compiling in Scala 
3

##
File path: core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
##
@@ -59,7 +59,7 @@ class IsrExpirationTest {
   @BeforeEach
   def setUp(): Unit = {
 val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
+
EasyMock.expect(logManager.liveLogDirs).andReturn(Seq.empty[File]).anyTimes()

Review comment:
   Some of the implicit conversions in the past are not present any more. 
So we need to set the types more precisely.

##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -238,14 +238,14 @@ object ReassignPartitionsCommand extends Logging {
   executeAssignment(adminClient,
 opts.options.has(opts.additionalOpt),
 
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
-opts.options.valueOf(opts.interBrokerThrottleOpt),
-opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
-opts.options.valueOf(opts.timeoutOpt))
+opts.options.valueOf(opts.interBrokerThrottleOpt).longValue(),
+opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt).longValue(),
+opts.options.valueOf(opts.timeoutOpt).longValue())

Review comment:
   Some times Scala 3 can't automatically widen Ints to Longs

##
File path: gradle/spotbugs-exclude.xml
##
@@ -90,9 +90,17 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+
 
 
 
+
+
+
+
+

Review comment:
   This exception is due to the new naming convention for lazy fields in 
Scala 3. This is another false positive

##
File path: gradle/spotbugs-exclude.xml
##
@@ -289,6 +297,63 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+   

[jira] [Commented] (KAFKA-13265) Kafka consumers disappearing after certain point of time

2021-10-08 Thread tangzhongham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426112#comment-17426112
 ] 

tangzhongham commented on KAFKA-13265:
--

it seems that the problem came from the consumer side, you might need to check 
the client code and do some threaddump to see consumer's behavior

> Kafka consumers disappearing after certain point of time 
> -
>
> Key: KAFKA-13265
> URL: https://issues.apache.org/jira/browse/KAFKA-13265
> Project: Kafka
>  Issue Type: Test
>  Components: consumer
>Affects Versions: 2.4.0
>Reporter: Ayyandurai Mani
>Priority: Blocker
> Attachments: Consumer_Disappear_Issue_Screen.png, server.log
>
>
> Dear Kafka Team,
> We are facing one issue for past few days in our development environment. We 
> have topic called 'search-service-topic-dev' and consumer group 
> 'search-service-group' with 10 partitions, and concurrency also 10 at  
> consumer side. 
> When we publish more messages( each message is 115kb) into the topic after 
> some certain point of the time consumers disappeared from the consumer group 
> (note : consumer service are running). Have attached screenshot for reference 
> (filename : Consumer_Disappear_Issue_Screen.png) 
> From screenshot when i execute describe command for the consumer group at 
> 14:35:32 (IST) consumers were available but when i execute at 14:38:17(IST) 
> consumers were not there. 
> Attached kafka server.log for that particular time(kafka is running in UTC 
> timezone server).
> Note : Message size in each partitions is around 2GB.
>  
> We are kind of blocked due to this behavior. Please help me to resolve this. 
> Thanks in advance.
> Ayyandurai
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-08 Thread GitBox


tombentley commented on pull request #11384:
URL: https://github.com/apache/kafka/pull/11384#issuecomment-938483939


   Thanks @C0urante!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley merged pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-08 Thread GitBox


tombentley merged pull request #11384:
URL: https://github.com/apache/kafka/pull/11384


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org