[GitHub] [kafka] showuon commented on a diff in pull request #13297: Kafka-14743: update request metrics after callback
showuon commented on code in PR #13297: URL: https://github.com/apache/kafka/pull/13297#discussion_r1117875392 ## core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala: ## @@ -163,7 +164,9 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { } def testV1Fetch(isFollowerFetch: Boolean): Unit = { +val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,request=Fetch" Review Comment: Yes, we already used the constants from `RequestMetrics.MessageConversionsTimeMs`. It's just the fetch/produce request is another tag name that needed to be filtered out. Thanks. ## core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala: ## @@ -222,9 +225,14 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { } else { assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled)) } + TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec, s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " + s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000) + +TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs, + s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " + + s"init: $initialFetchMessageConversionsTimeMs final: ${TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)}", 5000) Review Comment: Good point. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
philipnee commented on code in PR #13301: URL: https://github.com/apache/kafka/pull/13301#discussion_r1117866050 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the + * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}. + * + * @param Record key type + * @param Record value type + */ +class CompletedFetch { + +private final Logger log; +private final SubscriptionState subscriptions; +private final boolean checkCrcs; +private final BufferSupplier decompressionBufferSupplier; +private final Deserializer keyDeserializer; +private final Deserializer valueDeserializer; +private final IsolationLevel isolationLevel; +public final TopicPartition partition; Review Comment: I think we need it public because it is in the internal package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
philipnee commented on code in PR #13301: URL: https://github.com/apache/kafka/pull/13301#discussion_r1117808261 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the + * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}. + * + * @param Record key type + * @param Record value type + */ +class CompletedFetch { + +private final Logger log; +private final SubscriptionState subscriptions; +private final boolean checkCrcs; +private final BufferSupplier decompressionBufferSupplier; +private final Deserializer keyDeserializer; +private final Deserializer valueDeserializer; +private final IsolationLevel isolationLevel; +public final TopicPartition partition; +private final Iterator batches; +private final Set abortedProducerIds; +private final PriorityQueue abortedTransactions; +final FetchResponseData.PartitionData partitionData; +final FetchResponseMetricAggregator metricAggregator; +final short responseVersion; + +private int recordsRead; +private int bytesRead; +private RecordBatch currentBatch; +private Record lastRecord; +private CloseableIterator records; +long nextFetchOffset; +Optional lastEpoch; +boolean isConsumed = false; +private Exception cachedRecordException = null; +private boolean corruptLastRecord = false; +boolean initialized = false; + +CompletedFetch(LogContext logContext, + SubscriptionState subscriptions, + boolean checkCrcs, + BufferSupplier decompressionBufferSupplier, + Deserializer keyDeserializer, + Deserializer valueDeserializer, + IsolationLevel isolationLevel, + TopicPartition partition, + FetchResponseData.PartitionData partitionData, + FetchResponseMetricAggregator metricAggregator, + Iterator batches, + Long fetchOffset, + short responseVersion) { +this.log = logContext.logger(CompletedFetch.class); +this.subscriptions = subscriptions; +this.checkCrcs = checkCrcs; +this.decompressionBufferSupplier = decompressionBufferSupplier; +this.keyDeserializer = keyDeserializer; +this.valueDeserializer = valueDeserializer; +this.isolationLevel = isolationLevel; +this.partition = partition; +this.part
[GitHub] [kafka] satishd opened a new pull request, #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
satishd opened a new pull request, #13304: URL: https://github.com/apache/kafka/pull/13304 KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13205: KAFKA-14680: Upgrade gradle version from 7.6 to 8.0.1
chia7712 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1444898374 Should we disable `spotlessScala` temporarily? otherwise, the Gradle task related to `spotlessScala` can't work. For example, the common command `./gradlew clean build -x test`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
guozhangwang commented on code in PR #13301: URL: https://github.com/apache/kafka/pull/13301#discussion_r1117834119 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the + * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}. + * + * @param Record key type + * @param Record value type + */ +class CompletedFetch { + +private final Logger log; +private final SubscriptionState subscriptions; +private final boolean checkCrcs; +private final BufferSupplier decompressionBufferSupplier; +private final Deserializer keyDeserializer; +private final Deserializer valueDeserializer; +private final IsolationLevel isolationLevel; +public final TopicPartition partition; +private final Iterator batches; +private final Set abortedProducerIds; +private final PriorityQueue abortedTransactions; +final FetchResponseData.PartitionData partitionData; +final FetchResponseMetricAggregator metricAggregator; +final short responseVersion; Review Comment: The name here seems a bit misleading, I think it should be `requestVersion` right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apac
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
vcrfxia commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117838013 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private final StateStore store; +private final StoreType storeType; + +private enum StoreType { +TIMESTAMPED, +VERSIONED; +} + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +store = context.getStateStore(storeName); + +if (store instanceof TimestampedKeyValueStore) { Review Comment: It's interesting that these checks don't verify the generic types (i.e., `K` and `V`) but IIUC the code in `ProcessorContextImpl#getStateStore()` for casting state stores to the requested type does not check generic types either, so I don't believe there's a visible change in behavior here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
vcrfxia commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117829563 ## streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class KeyValueStoreWrapperTest { + +private static final String STORE_NAME = "kvStore"; +private static final String KEY = "k"; +private static final ValueAndTimestamp VALUE_AND_TIMESTAMP += ValueAndTimestamp.make("v", 8L); + +@Mock +private TimestampedKeyValueStore timestampedStore; +@Mock +private VersionedKeyValueStore versionedStore; +@Mock +private ProcessorContext context; +@Mock +private Query query; +@Mock +private PositionBound positionBound; +@Mock +private QueryConfig queryConfig; +@Mock +private QueryResult result; +@Mock +private Position position; + +private KeyValueStoreWrapper wrapper; + +@Test +public void shouldThrowOnNonTimestampedOrVersionedStore() { + when(context.getStateStore(STORE_NAME)).thenReturn(mock(KeyValueStore.class)); + +assertThrows(InvalidStateStoreException.class, () -> new KeyValueStoreWrapper<>(context, STORE_NAME)); +} + +@Test +public void shouldGetFromTimestampedStore() { +givenWrapperWithTimestampedStore(); +when(timestampedStore.get(KEY)).thenReturn(VALUE_AND_TIMESTAMP); + +assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP)); +} + +@Test +public void shouldGetFromVersionedStore() { +givenWrapperWithVersionedStore(); +when(versionedStore.get(KEY)).thenReturn( +new VersionedRecord<>( +VALUE_AND_TIMESTAMP.value(), +VALUE_AND_TIMESTAMP.timestamp()) +); + +assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP)); +} + +@Test +public void shouldGetNullFromTimestampedStore() { +givenWrapperWithTimestampedStore(); +when(timestampedStore.get(KEY)).thenReturn(null); + +assertThat(wrapper.get(KEY), nullValue()); +} + +@Test +public void shouldGetNullFromVersionedStore() { +givenWrapperWithVersionedStore(); +when(versionedStore.get(KEY)).thenReturn(null); + +assertThat(wrapper.get(KEY), nullValue()); +} + +@Test +public void shouldPutToTimestampedStore() { +givenWrapperWithTimestampedStore(); + +wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), VALUE_AND_TIMESTAMP.timestamp()); + +verify(timestampedStore).put(KEY, VALUE_AND_TIMESTAMP); +} + +@Test +public void shouldPutToVersionedStore() { +givenWrapperWithVersionedStore(); + +wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), VALUE_AND_TIMESTAMP.timestamp()); + +verify(versionedStore).put(KEY, VALUE_AND_TIMESTAMP.value
[jira] [Comment Edited] (KAFKA-14762) Remove 0_8_2_LATEST from rolling upgrade system test
[ https://issues.apache.org/jira/browse/KAFKA-14762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693391#comment-17693391 ] Greg Harris edited comment on KAFKA-14762 at 2/25/23 12:54 AM: --- Based on my understanding of the EOL policy: [https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy] which i'll reproduce here: {noformat} Given 3 releases a year and the fact that no one upgrades three times a year, we propose making sure (by testing!) that rolling upgrade can be done from each release in the past year (i.e. last 3 releases) to the latest version.{noformat} {noformat} We will also attempt, as a community to do bugfix releases as needed for the last 3 releases.{noformat} I believe that this change is compliant with the EOL policy, and does not require a KIP to implement. This ticket is merely effecting the EOL of the 0.8.2.2 version which took place in October 2016. The system tests and documentation have been much more conservative than the EOL policy so far, and should continue to be for 0.9.0+ as there is not yet a reason to remove the compatibility tests for those versions. was (Author: gharris1727): Based on my understanding of the EOL policy: [https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy] which i'll reproduce here: {noformat} Given 3 releases a year and the fact that no one upgrades three times a year, we propose making sure (by testing!) that rolling upgrade can be done from each release in the past year (i.e. last 3 releases) to the latest version.We will also attempt, as a community to do bugfix releases as needed for the last 3 releases.{noformat} I believe that this change is compliant with the EOL policy, and does not require a KIP to implement. This ticket is merely effecting the EOL of the 0.8.2.2 version which took place in October 2016. The system tests and documentation have been much more conservative than the EOL policy so far, and should continue to be for 0.9.0+ as there is not yet a reason to remove the compatibility tests for those versions. > Remove 0_8_2_LATEST from rolling upgrade system test > > > Key: KAFKA-14762 > URL: https://issues.apache.org/jira/browse/KAFKA-14762 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Greg Harris >Priority: Minor > > Currently, the core/upgrade_test.py exercises rolling from various past Kafka > versions to the development version. The earliest version that it currently > tests is 0_8_2_LATEST, 0.8.2.2, released October 2, 2015. > The test has a special case for that that version, which does not contain a > copy of the `tools` jar. The test is written to make use of the tools jar for > assertions, but because the jar does not exist in 0.8.2.2, a jar from the > latest development version is used instead. For example, this has the effect > that when executing the 0.8.2.2 upgrade case, the 3.5.0-SNAPSHOT tools jar is > on the classpath with the 0.8.2.2 clients jar. > Because of this, development on the VerifiableProducer has needed to be > backwards compatible with the 0.8.2.2 clients jar, and this has lead to code > duplication and other limitations on the maintenance that can be done to the > class. This appears to be mostly an artifact of how the testing is carried > out, as upgrades are typically performed without mixing jars from different > versions of Kafka. > In order to lift those limitations, we should eliminate this one version from > compatibility testing. Accompanying this change, we should update the latest > documentation to say "Upgrading to from any version 0.9.x through > " instead of 0.8.x, since that version will no longer be regularly > tested. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14762) Remove 0_8_2_LATEST from rolling upgrade system test
[ https://issues.apache.org/jira/browse/KAFKA-14762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693391#comment-17693391 ] Greg Harris commented on KAFKA-14762: - Based on my understanding of the EOL policy: [https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy] which i'll reproduce here: {noformat} Given 3 releases a year and the fact that no one upgrades three times a year, we propose making sure (by testing!) that rolling upgrade can be done from each release in the past year (i.e. last 3 releases) to the latest version.We will also attempt, as a community to do bugfix releases as needed for the last 3 releases.{noformat} I believe that this change is compliant with the EOL policy, and does not require a KIP to implement. This ticket is merely effecting the EOL of the 0.8.2.2 version which took place in October 2016. The system tests and documentation have been much more conservative than the EOL policy so far, and should continue to be for 0.9.0+ as there is not yet a reason to remove the compatibility tests for those versions. > Remove 0_8_2_LATEST from rolling upgrade system test > > > Key: KAFKA-14762 > URL: https://issues.apache.org/jira/browse/KAFKA-14762 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Greg Harris >Priority: Minor > > Currently, the core/upgrade_test.py exercises rolling from various past Kafka > versions to the development version. The earliest version that it currently > tests is 0_8_2_LATEST, 0.8.2.2, released October 2, 2015. > The test has a special case for that that version, which does not contain a > copy of the `tools` jar. The test is written to make use of the tools jar for > assertions, but because the jar does not exist in 0.8.2.2, a jar from the > latest development version is used instead. For example, this has the effect > that when executing the 0.8.2.2 upgrade case, the 3.5.0-SNAPSHOT tools jar is > on the classpath with the 0.8.2.2 clients jar. > Because of this, development on the VerifiableProducer has needed to be > backwards compatible with the 0.8.2.2 clients jar, and this has lead to code > duplication and other limitations on the maintenance that can be done to the > class. This appears to be mostly an artifact of how the testing is carried > out, as upgrades are typically performed without mixing jars from different > versions of Kafka. > In order to lift those limitations, we should eliminate this one version from > compatibility testing. Accompanying this change, we should update the latest > documentation to say "Upgrading to from any version 0.9.x through > " instead of 0.8.x, since that version will no longer be regularly > tested. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14760) Move ThroughputThrottler, break connect-runtime dependency on tools
[ https://issues.apache.org/jira/browse/KAFKA-14760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693389#comment-17693389 ] Greg Harris commented on KAFKA-14760: - I opened a ticket for the above idea: KAFKA-14762, since it may be pursued separately. It may also require a KIP, since it would change the documentation about support for upgrades from 0.8.2. In the meantime, I'll look into duplicating the class or patching the system tests to substitute 0.9.0.0 jars instead of the dev version jars. > Move ThroughputThrottler, break connect-runtime dependency on tools > --- > > Key: KAFKA-14760 > URL: https://issues.apache.org/jira/browse/KAFKA-14760 > Project: Kafka > Issue Type: Task > Components: KafkaConnect, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > Currently there is only one dependency on the `tools` module, > `connect-runtime`. > This dependency is only for one class, the ThroughputThrottler. This class is > used by: > 1. tools main ProducerPerformance > 2. tools main VerifiableProducer > 3. runtime main SchemaSourceConnector > 4. runtime main VerifiableSourceConnector > 5. runtime test MonitorableSourceConnector > For KAFKA-14627, I want to be able to have `tools` (test) depend on > `connect-runtime` (test). This is because we are adding a connect-specific > command-line utility, and wish to re-use some of the existing connect test > infrastructure to unit test the new command-line utility. Unfortunately > naively adding this new dependency to tools causes a circular dependency that > prevents the project from building. > Instead of refactoring the connect-specific test utilities out to a new > package that both runtime and tools can depend on, it appears to make more > sense to move the more generic `ThroughputThrottler` class into some common > package. > This common package could be: > 1. clients > 2. server common > 3. some other existing package which would be a new dependency for tools > 4. a new package consisting of just the `ThroughputThrottler` class > I'm not sure which one of these makes the most sense, and would appreciate > guidance on what would make the most sense for ownership and maintenance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14762) Remove 0_8_2_LATEST from rolling upgrade system test
Greg Harris created KAFKA-14762: --- Summary: Remove 0_8_2_LATEST from rolling upgrade system test Key: KAFKA-14762 URL: https://issues.apache.org/jira/browse/KAFKA-14762 Project: Kafka Issue Type: Task Components: system tests Reporter: Greg Harris Currently, the core/upgrade_test.py exercises rolling from various past Kafka versions to the development version. The earliest version that it currently tests is 0_8_2_LATEST, 0.8.2.2, released October 2, 2015. The test has a special case for that that version, which does not contain a copy of the `tools` jar. The test is written to make use of the tools jar for assertions, but because the jar does not exist in 0.8.2.2, a jar from the latest development version is used instead. For example, this has the effect that when executing the 0.8.2.2 upgrade case, the 3.5.0-SNAPSHOT tools jar is on the classpath with the 0.8.2.2 clients jar. Because of this, development on the VerifiableProducer has needed to be backwards compatible with the 0.8.2.2 clients jar, and this has lead to code duplication and other limitations on the maintenance that can be done to the class. This appears to be mostly an artifact of how the testing is carried out, as upgrades are typically performed without mixing jars from different versions of Kafka. In order to lift those limitations, we should eliminate this one version from compatibility testing. Accompanying this change, we should update the latest documentation to say "Upgrading to from any version 0.9.x through " instead of 0.8.x, since that version will no longer be regularly tested. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1117823636 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } -if (error.exception() instanceof InvalidMetadataException) { +if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: @dajac For disconnected/timed out connections, this is the call stack: ```java Sender.poll() NetworkClient.poll() DefaultMetadataUpdater.handleServerDisconnect() Metadata.requestUpdate() However, `SenderTest` uses `MockClient` which doesn't have that same logic flow. So if I remove that clause of the `if` statement, the test, as written, fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14760) Move ThroughputThrottler, break connect-runtime dependency on tools
[ https://issues.apache.org/jira/browse/KAFKA-14760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693382#comment-17693382 ] Greg Harris commented on KAFKA-14760: - And for another alternative that _does_ allow us to start depending on modern functionality in tools: remove `0.8.2` from the upgrade tests. That version is the only case where the test infrastructure makes an exception and includes a modern tools jar with very old client jars. For 0.9.x+, the the tools jar exists and is used in the test infrastructure with the co-versioned clients jar, and doesn't have any cross-version concerns. This was not a tenable solution the last time this was tried, as it was the only version that the upgrade test was run against. But now, with 3 major versions gone by, perhaps it is time to remove this backwards compatibility check and start improving the tools jar. Here's the special case that we can remove: https://github.com/apache/kafka/blob/62431dca700fb2c7c3afe1a7c9eb07fe336f9b04/tests/kafkatest/services/verifiable_client.py#L253-L260 > Move ThroughputThrottler, break connect-runtime dependency on tools > --- > > Key: KAFKA-14760 > URL: https://issues.apache.org/jira/browse/KAFKA-14760 > Project: Kafka > Issue Type: Task > Components: KafkaConnect, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > Currently there is only one dependency on the `tools` module, > `connect-runtime`. > This dependency is only for one class, the ThroughputThrottler. This class is > used by: > 1. tools main ProducerPerformance > 2. tools main VerifiableProducer > 3. runtime main SchemaSourceConnector > 4. runtime main VerifiableSourceConnector > 5. runtime test MonitorableSourceConnector > For KAFKA-14627, I want to be able to have `tools` (test) depend on > `connect-runtime` (test). This is because we are adding a connect-specific > command-line utility, and wish to re-use some of the existing connect test > infrastructure to unit test the new command-line utility. Unfortunately > naively adding this new dependency to tools causes a circular dependency that > prevents the project from building. > Instead of refactoring the connect-specific test utilities out to a new > package that both runtime and tools can depend on, it appears to make more > sense to move the more generic `ThroughputThrottler` class into some common > package. > This common package could be: > 1. clients > 2. server common > 3. some other existing package which would be a new dependency for tools > 4. a new package consisting of just the `ThroughputThrottler` class > I'm not sure which one of these makes the most sense, and would appreciate > guidance on what would make the most sense for ownership and maintenance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
guozhangwang merged PR #13021: URL: https://github.com/apache/kafka/pull/13021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
guozhangwang commented on PR #13231: URL: https://github.com/apache/kafka/pull/13231#issuecomment-1444715595 @dajac @hachikuji if you do not have further comments, we can proceed and merge it then? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14685) TierStateMachine interface for building remote aux log
[ https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-14685. - Fix Version/s: 3.5.0 Resolution: Fixed merged the PR to trunk. > TierStateMachine interface for building remote aux log > -- > > Key: KAFKA-14685 > URL: https://issues.apache.org/jira/browse/KAFKA-14685 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Matthew Wong >Assignee: Matthew Wong >Priority: Major > Fix For: 3.5.0 > > > To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can > introduce an interface to manage state transitions of building the remote aux > log asynchronously -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14685) TierStateMachine interface for building remote aux log
[ https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-14685: --- Assignee: Matthew Wong > TierStateMachine interface for building remote aux log > -- > > Key: KAFKA-14685 > URL: https://issues.apache.org/jira/browse/KAFKA-14685 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Matthew Wong >Assignee: Matthew Wong >Priority: Major > > To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can > introduce an interface to manage state transitions of building the remote aux > log asynchronously -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee opened a new pull request, #13303: KAFKA-14761 Adding integration test for the prototype consumer
philipnee opened a new pull request, #13303: URL: https://github.com/apache/kafka/pull/13303 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1117803154 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ## @@ -767,20 +768,6 @@ public static void addInvocationRateToSensor(final Sensor sensor, ); } -public static void addInvocationRateAndCountToSensor(final Sensor sensor, Review Comment: This function is not used in prod code, hence cleaning it up. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java: ## @@ -213,7 +272,7 @@ public static Sensor recordLatenessSensor(final String threadId, public static Sensor droppedRecordsSensor(final String threadId, final String taskId, final StreamsMetricsImpl streamsMetrics) { -return invocationRateAndCountSensor( +return invocationRateAndTotalSensor( Review Comment: This is a piggy-backed metric fix: we should use cumulativeSum than cumulativeCount for dropped records, even though today with most callees as `sensor.record()` it is effectively the same as it only increment by 1, it is still vulnerable in case we record a non-one value in the future. ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java: ## @@ -146,13 +144,6 @@ private StateStoreMetrics() {} private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION = MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION; -private static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop"; Review Comment: This metric is removed as part of KIP-743, and it's only used in tests (which I also cleaned up as a piggy-back). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14760) Move ThroughputThrottler, break connect-runtime dependency on tools
[ https://issues.apache.org/jira/browse/KAFKA-14760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693377#comment-17693377 ] Greg Harris commented on KAFKA-14760: - Looking into this more, it appears that this was done previously and then reverted, because classes in the `tools` jar are run against a classpath containing potentially extremely old client versions, back to 0.8.2 in system tests. It appears that it is unsafe to depend on modern functionality in the tools jar, at least given the current way that upgrades are being tested and the tools jar is being used. Renaming the class and moving it to the clients jar would be depending on modern functionality, since the renamed class is only present after the refactor. So it's possible that we need to duplicate the ThroughputThrottler functionality, or remove the dependency by replacing it with some other class rather than moving and renaming the copy in tools. > Move ThroughputThrottler, break connect-runtime dependency on tools > --- > > Key: KAFKA-14760 > URL: https://issues.apache.org/jira/browse/KAFKA-14760 > Project: Kafka > Issue Type: Task > Components: KafkaConnect, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > Currently there is only one dependency on the `tools` module, > `connect-runtime`. > This dependency is only for one class, the ThroughputThrottler. This class is > used by: > 1. tools main ProducerPerformance > 2. tools main VerifiableProducer > 3. runtime main SchemaSourceConnector > 4. runtime main VerifiableSourceConnector > 5. runtime test MonitorableSourceConnector > For KAFKA-14627, I want to be able to have `tools` (test) depend on > `connect-runtime` (test). This is because we are adding a connect-specific > command-line utility, and wish to re-use some of the existing connect test > infrastructure to unit test the new command-line utility. Unfortunately > naively adding this new dependency to tools causes a circular dependency that > prevents the project from building. > Instead of refactoring the connect-specific test utilities out to a new > package that both runtime and tools can depend on, it appears to make more > sense to move the more generic `ThroughputThrottler` class into some common > package. > This common package could be: > 1. clients > 2. server common > 3. some other existing package which would be a new dependency for tools > 4. a new package consisting of just the `ThroughputThrottler` class > I'm not sure which one of these makes the most sense, and would appreciate > guidance on what would make the most sense for ownership and maintenance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao merged pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
junrao merged PR #13206: URL: https://github.com/apache/kafka/pull/13206 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14761) Integration Tests for the New Consumer Implementation
Philip Nee created KAFKA-14761: -- Summary: Integration Tests for the New Consumer Implementation Key: KAFKA-14761 URL: https://issues.apache.org/jira/browse/KAFKA-14761 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee This Jira tracks the efforts of integratoin testing for the new consumer we are implementing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693369#comment-17693369 ] Chris Egerton commented on KAFKA-14750: --- Thanks for filing this, [~morozov]! I've done some local testing and confirmed that the issue affects the current trunk, and after doing some digging, I suspect it goes back pretty far. Initially I suspected that this was simply a matter of adjusting the task's consumer's [metadata refresh interval|https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms] to be lower, which would cause it to detect changes in its topic regex subscription sooner. However, even after making that tweak, issues still surfaced. This is due to the fact that, after a topic is deleted, the task's [consumer rebalance listener|https://github.com/apache/kafka/blob/400ba0aeaeb6c460069d5ad12b1b3976ab447332/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L699-L784] is notified of the revocation of the partitions for that topic, which triggers an attempt to commit offsets–including offsets for the revoked topic partitions. There are a couple of approaches I can think of for this: # Adjust the Connect runtime's behavior to somehow discern the set of still-existing topic partitions before committing offsets, and skip committing offsets for recently-deleted topic partitions # Tweak the consumer logic to invoke {{ConsumerReabalanceListener::onPartitionsLost}} instead of {{ConsumerRebalanceListener::onPartitionsRevoked}} for deleted topic partitions. Given that option 1 is inherently subject to race conditions, I'd prefer to pursue option 2 initially. However, I'm not too familiar with the clients side of things, so it'd be nice to get a second opinion. [~jasong35] [~pnee] if either of you get a chance, would you mind weighing in here? TL; DR: Should we be treating deleted topic partitions as "lost" instead of "revoked" with consumer rebalance listeners? > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14760) Move ThroughputThrottler, break connect-runtime dependency on tools
Greg Harris created KAFKA-14760: --- Summary: Move ThroughputThrottler, break connect-runtime dependency on tools Key: KAFKA-14760 URL: https://issues.apache.org/jira/browse/KAFKA-14760 Project: Kafka Issue Type: Task Components: KafkaConnect, tools Reporter: Greg Harris Assignee: Greg Harris Currently there is only one dependency on the `tools` module, `connect-runtime`. This dependency is only for one class, the ThroughputThrottler. This class is used by: 1. tools main ProducerPerformance 2. tools main VerifiableProducer 3. runtime main SchemaSourceConnector 4. runtime main VerifiableSourceConnector 5. runtime test MonitorableSourceConnector For KAFKA-14627, I want to be able to have `tools` (test) depend on `connect-runtime` (test). This is because we are adding a connect-specific command-line utility, and wish to re-use some of the existing connect test infrastructure to unit test the new command-line utility. Unfortunately naively adding this new dependency to tools causes a circular dependency that prevents the project from building. Instead of refactoring the connect-specific test utilities out to a new package that both runtime and tools can depend on, it appears to make more sense to move the more generic `ThroughputThrottler` class into some common package. This common package could be: 1. clients 2. server common 3. some other existing package which would be a new dependency for tools 4. a new package consisting of just the `ThroughputThrottler` class I'm not sure which one of these makes the most sense, and would appreciate guidance on what would make the most sense for ownership and maintenance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117773343 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java: ## @@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final MaterializedInternal> materialize() { +public StoreBuilder materialize() { Review Comment: Why can't we keep the generic typer here? I understand that we need to make it `` in `KTableImpl` (and the "graph node" code below) but not sure why we need `` here, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117773343 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java: ## @@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final MaterializedInternal> materialize() { +public StoreBuilder materialize() { Review Comment: Why can't we keep the generic typer here? I understand that we need to make it `` in `KTableImpl` but not sure why we need `` here, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117769530 ## streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class KeyValueStoreWrapperTest { + +private static final String STORE_NAME = "kvStore"; +private static final String KEY = "k"; +private static final ValueAndTimestamp VALUE_AND_TIMESTAMP += ValueAndTimestamp.make("v", 8L); + +@Mock +private TimestampedKeyValueStore timestampedStore; +@Mock +private VersionedKeyValueStore versionedStore; +@Mock +private ProcessorContext context; +@Mock +private Query query; +@Mock +private PositionBound positionBound; +@Mock +private QueryConfig queryConfig; +@Mock +private QueryResult result; +@Mock +private Position position; + +private KeyValueStoreWrapper wrapper; + +@Test +public void shouldThrowOnNonTimestampedOrVersionedStore() { + when(context.getStateStore(STORE_NAME)).thenReturn(mock(KeyValueStore.class)); + +assertThrows(InvalidStateStoreException.class, () -> new KeyValueStoreWrapper<>(context, STORE_NAME)); +} + +@Test +public void shouldGetFromTimestampedStore() { +givenWrapperWithTimestampedStore(); +when(timestampedStore.get(KEY)).thenReturn(VALUE_AND_TIMESTAMP); + +assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP)); +} + +@Test +public void shouldGetFromVersionedStore() { +givenWrapperWithVersionedStore(); +when(versionedStore.get(KEY)).thenReturn( +new VersionedRecord<>( +VALUE_AND_TIMESTAMP.value(), +VALUE_AND_TIMESTAMP.timestamp()) +); + +assertThat(wrapper.get(KEY), equalTo(VALUE_AND_TIMESTAMP)); +} + +@Test +public void shouldGetNullFromTimestampedStore() { +givenWrapperWithTimestampedStore(); +when(timestampedStore.get(KEY)).thenReturn(null); + +assertThat(wrapper.get(KEY), nullValue()); +} + +@Test +public void shouldGetNullFromVersionedStore() { +givenWrapperWithVersionedStore(); +when(versionedStore.get(KEY)).thenReturn(null); + +assertThat(wrapper.get(KEY), nullValue()); +} + +@Test +public void shouldPutToTimestampedStore() { +givenWrapperWithTimestampedStore(); + +wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), VALUE_AND_TIMESTAMP.timestamp()); + +verify(timestampedStore).put(KEY, VALUE_AND_TIMESTAMP); +} + +@Test +public void shouldPutToVersionedStore() { +givenWrapperWithVersionedStore(); + +wrapper.put(KEY, VALUE_AND_TIMESTAMP.value(), VALUE_AND_TIMESTAMP.timestamp()); + +verify(versionedStore).put(KEY, VALUE_AND_TIMESTAMP.value()
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117766784 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private TimestampedKeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = context.getStateStore(storeName); +return; +} catch (final ClassCastException e) { +// ignore since could be versioned store instead +} + +try { +// next try versioned store +versionedStore = context.getStateStore(storeName); +} catch (final ClassCastException e) { +throw new InvalidStateStoreException("KTable source state store must implement either TimestampedKeyValueStore or VersionedKeyValueStore."); +} +} + +public ValueAndTimestamp get(final K key) { +if (timestampedStore != null) { +return timestampedStore.get(key); +} +if (versionedStore != null) { +final VersionedRecord versionedRecord = versionedStore.get(key); +return versionedRecord == null +? null +: ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()); +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public void put(final K key, final V value, final long timestamp) { +if (timestampedStore != null) { +timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp)); +return; +} +if (versionedStore != null) { +versionedStore.put(key, value, timestamp); +return; +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public StateStore getStore() { Review Comment: Do we really need this method? It seem whoever uses `KeyValueStoreWrapper` can keep a handle of the store themselves? If we think it's convenient to keep this helper, should we add a `Store store` member variable, and implement this one a simple `return store`? (Similar for `name()`, `init()` etc below?) If we don't get the correct stores passed into the constructor we fail anyway, and it seem a lot of unncessary boiler plate code in those "non-functional helpers" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1117760309 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private TimestampedKeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = context.getStateStore(storeName); +return; +} catch (final ClassCastException e) { +// ignore since could be versioned store instead +} + +try { +// next try versioned store +versionedStore = context.getStateStore(storeName); +} catch (final ClassCastException e) { +throw new InvalidStateStoreException("KTable source state store must implement either TimestampedKeyValueStore or VersionedKeyValueStore."); Review Comment: Should we include the type of the store we found in the error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
mjsax merged PR #13252: URL: https://github.com/apache/kafka/pull/13252 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
mjsax commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1117748690 ## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java: ## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class MeteredVersionedKeyValueStoreTest { + +private static final String STORE_NAME = "versioned_store"; +private static final Serde STRING_SERDE = new StringSerde(); +private static final Serde> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE); +private static final String METRICS_SCOPE = "scope"; +private static final String STORE_LEVEL_GROUP = "stream-state-metrics"; +private static final String APPLICATION_ID = "test-app"; +private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology"); + +private static final String KEY = "k"; +private static final String VALUE = "v"; +private static final long TIMESTAMP = 10L; +private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY)); +private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer() +.serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP)); + +private final VersionedBytesStore inner = mock(VersionedBytesStore.class); +private final Metrics metrics = new Metrics(); +private final Time mockTime = new MockTime(); +private final String threadId = Thread.currentThread().getName(); +private InternalProcessorContext context = mock(InternalProcessorContext.class); +private Map tags; + +private MeteredVersionedKeyValueStore store; + +@Before +public void setUp() { +when(inner.name()).thenReturn(STORE_NAME); +when(context.metrics()).thenReturn(new St
[GitHub] [kafka] gharris1727 opened a new pull request, #13302: KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new test-plugins module
gharris1727 opened a new pull request, #13302: URL: https://github.com/apache/kafka/pull/13302 These connectors do not belong in connect-runtime, and there is special code to hide them from the REST API. Instead, these connectors should be excluded by the build process, and added to the classpath for testing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14759) Move test-only connectors from connect-runtime to test-specific module
Greg Harris created KAFKA-14759: --- Summary: Move test-only connectors from connect-runtime to test-specific module Key: KAFKA-14759 URL: https://issues.apache.org/jira/browse/KAFKA-14759 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris The following plugins are included in connect-runtime, but are excluded from the `/connector-plugins` REST endpoint: 1. MockSinkConnector 2. MockSourceConnector 3. SchemaSourceConnector 4. VerifiableSinkConnector 5. VerifiableSourceConnector These plugins are currently in use in the connect-runtime tests and system tests. They are not mentioned in the public documentation, and should not be in general use by users or downstream projects. In order to remove the special case in the REST resources to exclude these plugins, and reduce the attack surface area of a default Connect installation, these should be moved into a new internal module that is intentionally added when needed for running tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
kirktrue commented on PR #13301: URL: https://github.com/apache/kafka/pull/13301#issuecomment-1444580426 @hachikuji @rajinisivaram @guozhangwang @vvcephei @philipnee This is another PR that "simply" refactors out the inner classes from the `Fetcher`. I broke it out into a separate PR because it's very noisy for the forthcoming PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14540) DataOutputStreamWritable#writeByteBuffer writes the wrong portion of the parameterized buffer
[ https://issues.apache.org/jira/browse/KAFKA-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Marshall resolved KAFKA-14540. -- Resolution: Fixed Fixed by https://github.com/apache/kafka/pull/13032 > DataOutputStreamWritable#writeByteBuffer writes the wrong portion of the > parameterized buffer > - > > Key: KAFKA-14540 > URL: https://issues.apache.org/jira/browse/KAFKA-14540 > Project: Kafka > Issue Type: Bug >Reporter: Michael Marshall >Priority: Minor > > The method DataOutputStreamWritable#writeByteBuffer uses the buffer's > position instead of its arrayOffset when writing the buffer to the output > stream. As a result, the resulting buffer is corrupted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14758) Extract inner classes from Fetcher for reuse in refactoring
[ https://issues.apache.org/jira/browse/KAFKA-14758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14758: -- Reviewer: Jason Gustafson > Extract inner classes from Fetcher for reuse in refactoring > --- > > Key: KAFKA-14758 > URL: https://issues.apache.org/jira/browse/KAFKA-14758 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out the inner > classes into top-level (though still in {{{}internal{}}}) so that those > classes can be referenced by forthcoming refactored fetch logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue opened a new pull request, #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
kirktrue opened a new pull request, #13301: URL: https://github.com/apache/kafka/pull/13301 The `Fetcher` class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored `Fetcher`. This task includes refactoring `Fetcher` by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-4106) Consumer / add configure method to PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-4106: > Consumer / add configure method to PartitionAssignor interface > -- > > Key: KAFKA-4106 > URL: https://issues.apache.org/jira/browse/KAFKA-4106 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 0.10.0.1 >Reporter: Florian Hussonnois >Assignee: Jason Gustafson >Priority: Minor > > Currently, we can implement a custom PartitionAssignor which will forward > user data that will be used during the assignments protocol. For example, > data can be used to implement a rack-aware assignor > However, currently we cannot dynamically configure a PartitionAssignor > instance. > It would be nice to add a method configure(Map PartitionAssignor interface. Then, this method will be invoked by the > KafkaConsumer on each assignor, as this is do for deserializers. > The code modifications are pretty straight-forward but involve modifying the > public interface PartitionAssignor. Does that mean this JIRA needs a KIP ? > I can contribute to that improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-3117) Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance
[ https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-3117: > Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance > --- > > Key: KAFKA-3117 > URL: https://issues.apache.org/jira/browse/KAFKA-3117 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 > Environment: oracle java764bit > ubuntu 13.10 >Reporter: edwardt >Assignee: Jason Gustafson >Priority: Major > Labels: newbie, test, transient-unit-test-failure > > java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, > topic2-1] but actually got [topic-0, topic-1] > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730) > at > kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-5452) Aggressive log compaction ratio appears to have no negative effect on log-compacted topics
[ https://issues.apache.org/jira/browse/KAFKA-5452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5452. Resolution: Fixed > Aggressive log compaction ratio appears to have no negative effect on > log-compacted topics > -- > > Key: KAFKA-5452 > URL: https://issues.apache.org/jira/browse/KAFKA-5452 > Project: Kafka > Issue Type: Improvement > Components: config, core, log >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu Trusty (14.04.5), Oracle JDK 8 >Reporter: Jeff Chao >Priority: Major > Labels: performance > Attachments: 200mbs-dirty0-dirty-1-dirty05.png, > flame-graph-200mbs-dirty0.png, flame-graph-200mbs-dirty0.svg > > > Some of our users are seeing unintuitive/unexpected behavior with > log-compacted topics where they receive multiple records for the same key > when consuming. This is a result of low throughput on log-compacted topics > such that conditions ({{min.cleanable.dirty.ratio = 0.5}}, default) aren't > met for compaction to kick in. > This prompted us to test and tune {{min.cleanable.dirty.ratio}} in our > clusters. It appears that having more aggressive log compaction ratios don't > have negative effects on CPU and memory utilization. If this is truly the > case, we should consider changing the default from {{0.5}} to something more > aggressive. > Setup: > # 8 brokers > # 5 zk nodes > # 32 partitions on a topic > # replication factor 3 > # log roll 3 hours > # log segment bytes 1 GB > # log retention 24 hours > # all messages to a single key > # all messages to a unique key > # all messages to a bounded key range [0, 999] > # {{min.cleanable.dirty.ratio}} per topic = {{0}}, {{0.5}}, and {{1}} > # 200 MB/s sustained, produce and consume traffic > Observations: > We were able to verify log cleaner threads were performing work by checking > the logs and verifying the {{cleaner-offset-checkpoint}} file for all topics. > We also observed the log cleaner's {{time-since-last-run-ms}} metric was > normal, never going above the default of 15 seconds. > Under-replicated partitions stayed steady, same for replication lag. > Here's an example test run where we try out {{min.cleanable.dirty.ratio = > 0}}, {{min.cleanable.dirty.ratio = 1}}, and {{min.cleanable.dirty.ratio = > 0.5}}. Troughs in between the peaks represent zero traffic and reconfiguring > of topics. > (200mbs-dirty-0-dirty1-dirty05.png attached) > !200mbs-dirty0-dirty-1-dirty05.png|thumbnail! > Memory utilization is fine, but more interestingly, CPU doesn't appear to > have much difference. > To get more detail, here is a flame graph (raw svg attached) of the run for > {{min.cleanable.dirty.ratio = 0}}. The conservative and default ratio flame > graphs are equivalent. > (flame-graph-200mbs-dirty0.png attached) > !flame-graph-200mbs-dirty0.png|thumbnail! > Notice that the majority of CPU is coming from: > # SSL operations (on reads/writes) > # KafkaApis::handleFetchRequest (ReplicaManager::fetchMessages) > # KafkaApis::handleOffsetFetchRequest > We also have examples from small scale test runs which show similar behavior > but with scaled down CPU usage. > It seems counterintuitive that there's no apparent difference in CPU whether > it be aggressive or conservative compaction ratios, so we'd like to get some > thoughts from the community. > We're looking for feedback on whether or not anyone else has experienced this > behavior before as well or, if CPU isn't affected, has anyone seen something > related instead. > If this is true, then we'd be happy to discuss further and provide a patch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-4106) Consumer / add configure method to PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-4106. Resolution: Fixed > Consumer / add configure method to PartitionAssignor interface > -- > > Key: KAFKA-4106 > URL: https://issues.apache.org/jira/browse/KAFKA-4106 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 0.10.0.1 >Reporter: Florian Hussonnois >Assignee: Jason Gustafson >Priority: Minor > > Currently, we can implement a custom PartitionAssignor which will forward > user data that will be used during the assignments protocol. For example, > data can be used to implement a rack-aware assignor > However, currently we cannot dynamically configure a PartitionAssignor > instance. > It would be nice to add a method configure(Map PartitionAssignor interface. Then, this method will be invoked by the > KafkaConsumer on each assignor, as this is do for deserializers. > The code modifications are pretty straight-forward but involve modifying the > public interface PartitionAssignor. Does that mean this JIRA needs a KIP ? > I can contribute to that improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name
[ https://issues.apache.org/jira/browse/KAFKA-8177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8177. Resolution: Fixed > Allow for separate connect instances to have sink connectors with the same > name > --- > > Key: KAFKA-8177 > URL: https://issues.apache.org/jira/browse/KAFKA-8177 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Paul Whalen >Priority: Minor > Labels: connect > > If you have multiple Connect instances (either a single standalone or > distributed group of workers) running against the same Kafka cluster, the > connect instances cannot each have a sink connector with the same name and > still operate independently. This is because the consumer group ID used > internally for reading from the source topic(s) is entirely derived from the > connector's name: > [https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24] > The documentation of Connect implies to me that it supports "multi-tenancy," > that is, as long as... > * In standalone mode, the {{offset.storage.file.filename}} is not shared > between instances > * In distributed mode, {{group.id}} and {{config.storage.topic}}, > {{offset.storage.topic}}, and {{status.storage.topic}} are not the same > between instances > ... then the connect instances can operate completely independently without > fear of conflict. But the sink connector consumer group naming policy makes > this untrue. Obviously this can be achieved by uniquely naming connectors > across instances, but in some environments that could be a bit of a nuisance, > or a challenging policy to enforce. For instance, imagine a large group of > developers or data analysts all running their own standalone Connect to load > into a SQL database for their own analysis, or replicating to mirroring to > their own local cluster for testing. > The obvious solution is allow supplying config that gives a Connect instance > some notion of identity, and to use that when creating the sink task consumer > group. Distributed mode already has this obviously ({{group.id}}), but it > would need to be added for standalone mode. Maybe {{instance.id}}? Given that > solution it seems like this would need a small KIP. > I could also imagine this solving this problem through better documentation > ("ensure your connector names are unique!"), but having that subtlety doesn't > seem worth it to me. (Optionally) assigning identity to every Connect > instance seems strictly more clear, without any downside. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name
[ https://issues.apache.org/jira/browse/KAFKA-8177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-8177: > Allow for separate connect instances to have sink connectors with the same > name > --- > > Key: KAFKA-8177 > URL: https://issues.apache.org/jira/browse/KAFKA-8177 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Paul Whalen >Priority: Minor > Labels: connect > > If you have multiple Connect instances (either a single standalone or > distributed group of workers) running against the same Kafka cluster, the > connect instances cannot each have a sink connector with the same name and > still operate independently. This is because the consumer group ID used > internally for reading from the source topic(s) is entirely derived from the > connector's name: > [https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24] > The documentation of Connect implies to me that it supports "multi-tenancy," > that is, as long as... > * In standalone mode, the {{offset.storage.file.filename}} is not shared > between instances > * In distributed mode, {{group.id}} and {{config.storage.topic}}, > {{offset.storage.topic}}, and {{status.storage.topic}} are not the same > between instances > ... then the connect instances can operate completely independently without > fear of conflict. But the sink connector consumer group naming policy makes > this untrue. Obviously this can be achieved by uniquely naming connectors > across instances, but in some environments that could be a bit of a nuisance, > or a challenging policy to enforce. For instance, imagine a large group of > developers or data analysts all running their own standalone Connect to load > into a SQL database for their own analysis, or replicating to mirroring to > their own local cluster for testing. > The obvious solution is allow supplying config that gives a Connect instance > some notion of identity, and to use that when creating the sink task consumer > group. Distributed mode already has this obviously ({{group.id}}), but it > would need to be added for standalone mode. Maybe {{instance.id}}? Given that > solution it seems like this would need a small KIP. > I could also imagine this solving this problem through better documentation > ("ensure your connector names are unique!"), but having that subtlety doesn't > seem worth it to me. (Optionally) assigning identity to every Connect > instance seems strictly more clear, without any downside. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-4187) Adding a flag to prefix topics with mirror maker
[ https://issues.apache.org/jira/browse/KAFKA-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-4187: > Adding a flag to prefix topics with mirror maker > > > Key: KAFKA-4187 > URL: https://issues.apache.org/jira/browse/KAFKA-4187 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 0.8.2.1, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Vincent Rischmann >Priority: Minor > > So I have a setup where I need to mirror our production cluster to our > preproduction cluster, but can't use the original topic names. > I've patched mirror maker to allow me to define a prefix for each topic and I > basically prefix everything with mirror_. I'm wondering if there's interest > for this feature upstream ? > I have a patch available for Kafka 0.9.0.1 (what I'm using) and from what > I've seen it should apply well to Kafka 0.10.0.X too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-4187) Adding a flag to prefix topics with mirror maker
[ https://issues.apache.org/jira/browse/KAFKA-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-4187. Resolution: Fixed > Adding a flag to prefix topics with mirror maker > > > Key: KAFKA-4187 > URL: https://issues.apache.org/jira/browse/KAFKA-4187 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 0.8.2.1, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Vincent Rischmann >Priority: Minor > > So I have a setup where I need to mirror our production cluster to our > preproduction cluster, but can't use the original topic names. > I've patched mirror maker to allow me to define a prefix for each topic and I > basically prefix everything with mirror_. I'm wondering if there's interest > for this feature upstream ? > I have a patch available for Kafka 0.9.0.1 (what I'm using) and from what > I've seen it should apply well to Kafka 0.10.0.X too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-3117) Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance
[ https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-3117. Resolution: Fixed > Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance > --- > > Key: KAFKA-3117 > URL: https://issues.apache.org/jira/browse/KAFKA-3117 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 > Environment: oracle java764bit > ubuntu 13.10 >Reporter: edwardt >Assignee: Jason Gustafson >Priority: Major > Labels: newbie, test, transient-unit-test-failure > > java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, > topic2-1] but actually got [topic-0, topic-1] > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730) > at > kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-5452) Aggressive log compaction ratio appears to have no negative effect on log-compacted topics
[ https://issues.apache.org/jira/browse/KAFKA-5452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-5452: > Aggressive log compaction ratio appears to have no negative effect on > log-compacted topics > -- > > Key: KAFKA-5452 > URL: https://issues.apache.org/jira/browse/KAFKA-5452 > Project: Kafka > Issue Type: Improvement > Components: config, core, log >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu Trusty (14.04.5), Oracle JDK 8 >Reporter: Jeff Chao >Priority: Major > Labels: performance > Attachments: 200mbs-dirty0-dirty-1-dirty05.png, > flame-graph-200mbs-dirty0.png, flame-graph-200mbs-dirty0.svg > > > Some of our users are seeing unintuitive/unexpected behavior with > log-compacted topics where they receive multiple records for the same key > when consuming. This is a result of low throughput on log-compacted topics > such that conditions ({{min.cleanable.dirty.ratio = 0.5}}, default) aren't > met for compaction to kick in. > This prompted us to test and tune {{min.cleanable.dirty.ratio}} in our > clusters. It appears that having more aggressive log compaction ratios don't > have negative effects on CPU and memory utilization. If this is truly the > case, we should consider changing the default from {{0.5}} to something more > aggressive. > Setup: > # 8 brokers > # 5 zk nodes > # 32 partitions on a topic > # replication factor 3 > # log roll 3 hours > # log segment bytes 1 GB > # log retention 24 hours > # all messages to a single key > # all messages to a unique key > # all messages to a bounded key range [0, 999] > # {{min.cleanable.dirty.ratio}} per topic = {{0}}, {{0.5}}, and {{1}} > # 200 MB/s sustained, produce and consume traffic > Observations: > We were able to verify log cleaner threads were performing work by checking > the logs and verifying the {{cleaner-offset-checkpoint}} file for all topics. > We also observed the log cleaner's {{time-since-last-run-ms}} metric was > normal, never going above the default of 15 seconds. > Under-replicated partitions stayed steady, same for replication lag. > Here's an example test run where we try out {{min.cleanable.dirty.ratio = > 0}}, {{min.cleanable.dirty.ratio = 1}}, and {{min.cleanable.dirty.ratio = > 0.5}}. Troughs in between the peaks represent zero traffic and reconfiguring > of topics. > (200mbs-dirty-0-dirty1-dirty05.png attached) > !200mbs-dirty0-dirty-1-dirty05.png|thumbnail! > Memory utilization is fine, but more interestingly, CPU doesn't appear to > have much difference. > To get more detail, here is a flame graph (raw svg attached) of the run for > {{min.cleanable.dirty.ratio = 0}}. The conservative and default ratio flame > graphs are equivalent. > (flame-graph-200mbs-dirty0.png attached) > !flame-graph-200mbs-dirty0.png|thumbnail! > Notice that the majority of CPU is coming from: > # SSL operations (on reads/writes) > # KafkaApis::handleFetchRequest (ReplicaManager::fetchMessages) > # KafkaApis::handleOffsetFetchRequest > We also have examples from small scale test runs which show similar behavior > but with scaled down CPU usage. > It seems counterintuitive that there's no apparent difference in CPU whether > it be aggressive or conservative compaction ratios, so we'd like to get some > thoughts from the community. > We're looking for feedback on whether or not anyone else has experienced this > behavior before as well or, if CPU isn't affected, has anyone seen something > related instead. > If this is true, then we'd be happy to discuss further and provide a patch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic
[ https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6014. Resolution: Fixed > new consumer mirror maker halts after committing offsets to a deleted topic > --- > > Key: KAFKA-6014 > URL: https://issues.apache.org/jira/browse/KAFKA-6014 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Jason Gustafson >Priority: Major > > New consumer throws an unexpected KafkaException when trying to commit to a > topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to > catch the KafkaException and just kills the process. We didn't see this in > the old consumer because old consumer just silently drops failed offset > commits. > I ran a quick experiment locally to prove the behavior. The experiment: > 1. start up a single broker > 2. create a single-partition topic t > 3. create a new consumer that consumes topic t > 4. make the consumer commit every few seconds > 5. delete topic t > 6. expect: KafkaException that kills the process. > Here's my script: > {code} > package org.apache.kafka.clients.consumer; > import org.apache.kafka.common.TopicPartition; > import java.util.Collections; > import java.util.List; > import java.util.Properties; > public class OffsetCommitTopicDeletionTest { > public static void main(String[] args) throws InterruptedException { > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9090"); > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g"); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > KafkaConsumer kafkaConsumer = new > KafkaConsumer<>(props); > TopicPartition partition = new TopicPartition("t", 0); > List partitions = > Collections.singletonList(partition); > kafkaConsumer.assign(partitions); > while (true) { > kafkaConsumer.commitSync(Collections.singletonMap(partition, new > OffsetAndMetadata(0, ""))); > Thread.sleep(1000); > } > } > } > {code} > Here are the other commands: > {code} > > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > > ./gradlew clean jar > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > > --partitions 1 --replication-factor 1 > > ./bin/kafka-run-class.sh > > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t > {code} > Here is the output: > {code} > [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] > Offset commit failed on partition t-0 at offset 0: This server does not host > this topic-partition. > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > Exception in thread "main" org.apache.kafka.common.KafkaException: Partition > t-0 may not exist or user may not have Describe access to topic > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.j
[jira] [Reopened] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic
[ https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-6014: > new consumer mirror maker halts after committing offsets to a deleted topic > --- > > Key: KAFKA-6014 > URL: https://issues.apache.org/jira/browse/KAFKA-6014 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Jason Gustafson >Priority: Major > > New consumer throws an unexpected KafkaException when trying to commit to a > topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to > catch the KafkaException and just kills the process. We didn't see this in > the old consumer because old consumer just silently drops failed offset > commits. > I ran a quick experiment locally to prove the behavior. The experiment: > 1. start up a single broker > 2. create a single-partition topic t > 3. create a new consumer that consumes topic t > 4. make the consumer commit every few seconds > 5. delete topic t > 6. expect: KafkaException that kills the process. > Here's my script: > {code} > package org.apache.kafka.clients.consumer; > import org.apache.kafka.common.TopicPartition; > import java.util.Collections; > import java.util.List; > import java.util.Properties; > public class OffsetCommitTopicDeletionTest { > public static void main(String[] args) throws InterruptedException { > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9090"); > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g"); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > KafkaConsumer kafkaConsumer = new > KafkaConsumer<>(props); > TopicPartition partition = new TopicPartition("t", 0); > List partitions = > Collections.singletonList(partition); > kafkaConsumer.assign(partitions); > while (true) { > kafkaConsumer.commitSync(Collections.singletonMap(partition, new > OffsetAndMetadata(0, ""))); > Thread.sleep(1000); > } > } > } > {code} > Here are the other commands: > {code} > > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > > ./gradlew clean jar > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > > --partitions 1 --replication-factor 1 > > ./bin/kafka-run-class.sh > > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t > {code} > Here is the output: > {code} > [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] > Offset commit failed on partition t-0 at offset 0: This server does not host > this topic-partition. > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > Exception in thread "main" org.apache.kafka.common.KafkaException: Partition > t-0 may not exist or user may not have Describe access to topic > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > at > org
[jira] [Reopened] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"
[ https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-3410: > Unclean leader election and "Halting because log truncation is not allowed" > --- > > Key: KAFKA-3410 > URL: https://issues.apache.org/jira/browse/KAFKA-3410 > Project: Kafka > Issue Type: Bug > Components: replication >Reporter: James Cheng >Priority: Major > Labels: reliability > > I ran into a scenario where one of my brokers would continually shutdown, > with the error message: > [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because > log truncation is not allowed for topic test, Current leader 1's latest > offset 0 is less than replica 2's latest offset 151 > (kafka.server.ReplicaFetcherThread) > I managed to reproduce it with the following scenario: > 1. Start broker1, with unclean.leader.election.enable=false > 2. Start broker2, with unclean.leader.election.enable=false > 3. Create topic, single partition, with replication-factor 2. > 4. Write data to the topic. > 5. At this point, both brokers are in the ISR. Broker1 is the partition > leader. > 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets > dropped out of ISR. Broker1 is still the leader. I can still write data to > the partition. > 7. Shutdown Broker1. Hard or controlled, doesn't matter. > 8. rm -rf the log directory of broker1. (This simulates a disk replacement or > full hardware replacement) > 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed > because broker1 is down. At this point, the partition is offline. Can't write > to it. > 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts > to join ISR, and immediately halts with the error message: > [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because > log truncation is not allowed for topic test, Current leader 1's latest > offset 0 is less than replica 2's latest offset 151 > (kafka.server.ReplicaFetcherThread) > I am able to recover by setting unclean.leader.election.enable=true on my > brokers. > I'm trying to understand a couple things: > * In step 10, why is broker1 allowed to resume leadership even though it has > no data? > * In step 10, why is it necessary to stop the entire broker due to one > partition that is in this state? Wouldn't it be possible for the broker to > continue to serve traffic for all the other topics, and just mark this one as > unavailable? > * Would it make sense to allow an operator to manually specify which broker > they want to become the new master? This would give me more control over how > much data loss I am willing to handle. In this case, I would want broker2 to > become the new master. Or, is that possible and I just don't know how to do > it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-8255) Replica fetcher thread exits with OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/KAFKA-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-8255: > Replica fetcher thread exits with OffsetOutOfRangeException > --- > > Key: KAFKA-8255 > URL: https://issues.apache.org/jira/browse/KAFKA-8255 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Colin McCabe >Priority: Major > > Replica fetcher threads can exits with OffsetOutOfRangeException when the log > start offset has advanced beyond the high water mark on the fetching broker. > Example stack trace: > {code} > org.apache.kafka.common.KafkaException: Error processing data for partition > __consumer_offsets-46 offset 18761 > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:335) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:294) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:294) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:293) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:293) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:292) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 4808819 of partition __consumer_offsets-46 > since it is larger than the high watermark 18761 > [2019-04-16 14:16:42,257] INFO [ReplicaFetcher replicaId=1001, leaderId=1003, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > {code} > It seems that we should not terminate the replica fetcher thread in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch
[ https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-7698: > Kafka Broker fail to start: ProducerFencedException thrown from > producerstatemanager.scala!checkProducerEpoch > -- > > Key: KAFKA-7698 > URL: https://issues.apache.org/jira/browse/KAFKA-7698 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Ming Liu >Priority: Major > Labels: easyfix > > During our operation of Kafka, we frequently saw this failure: > There was an error in one of the threads during logs loading: > org.apache.kafka.common.errors.ProducerFencedException: > {code:java} > [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] > Loading producer state from snapshot file > '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' > (kafka.log.ProducerStateManager) > [06:57:09,698] INFO [Log partition=interaction_events-127, > dir=/data/disk5/kafka] Completed load of log with 11 segments, log start > offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log) > [06:57:09,701] ERROR There was an error in one of the threads during logs > loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's > epoch is no longer valid. There is probably another producer with a newer > epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager) > [06:57:09,705] INFO [ProducerStateManager > partition=client_interaction_events_authorid_enrichment-20] Writing producer > snapshot at offset 92418754384 (kafka.log.ProducerStateManager) > [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer > startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 63 > (request epoch), 66 (server epoch) > {code:java} > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"
[ https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-3410. Resolution: Fixed > Unclean leader election and "Halting because log truncation is not allowed" > --- > > Key: KAFKA-3410 > URL: https://issues.apache.org/jira/browse/KAFKA-3410 > Project: Kafka > Issue Type: Bug > Components: replication >Reporter: James Cheng >Priority: Major > Labels: reliability > > I ran into a scenario where one of my brokers would continually shutdown, > with the error message: > [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because > log truncation is not allowed for topic test, Current leader 1's latest > offset 0 is less than replica 2's latest offset 151 > (kafka.server.ReplicaFetcherThread) > I managed to reproduce it with the following scenario: > 1. Start broker1, with unclean.leader.election.enable=false > 2. Start broker2, with unclean.leader.election.enable=false > 3. Create topic, single partition, with replication-factor 2. > 4. Write data to the topic. > 5. At this point, both brokers are in the ISR. Broker1 is the partition > leader. > 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets > dropped out of ISR. Broker1 is still the leader. I can still write data to > the partition. > 7. Shutdown Broker1. Hard or controlled, doesn't matter. > 8. rm -rf the log directory of broker1. (This simulates a disk replacement or > full hardware replacement) > 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed > because broker1 is down. At this point, the partition is offline. Can't write > to it. > 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts > to join ISR, and immediately halts with the error message: > [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because > log truncation is not allowed for topic test, Current leader 1's latest > offset 0 is less than replica 2's latest offset 151 > (kafka.server.ReplicaFetcherThread) > I am able to recover by setting unclean.leader.election.enable=true on my > brokers. > I'm trying to understand a couple things: > * In step 10, why is broker1 allowed to resume leadership even though it has > no data? > * In step 10, why is it necessary to stop the entire broker due to one > partition that is in this state? Wouldn't it be possible for the broker to > continue to serve traffic for all the other topics, and just mark this one as > unavailable? > * Would it make sense to allow an operator to manually specify which broker > they want to become the new master? This would give me more control over how > much data loss I am willing to handle. In this case, I would want broker2 to > become the new master. Or, is that possible and I just don't know how to do > it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-949) Integrate kafka into YARN
[ https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-949. --- Resolution: Fixed > Integrate kafka into YARN > - > > Key: KAFKA-949 > URL: https://issues.apache.org/jira/browse/KAFKA-949 > Project: Kafka > Issue Type: New Feature > Components: contrib >Affects Versions: 0.8.0 > Environment: hadoop 2-0.X >Reporter: Kam Kasravi >Priority: Major > > kafka is being added to bigtop (BIGTOP-989). Having kafka services available > under YARN will enable a number of cluster operations for kafka that YARN > handles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-5734) Heap (Old generation space) gradually increase
[ https://issues.apache.org/jira/browse/KAFKA-5734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5734. Resolution: Fixed > Heap (Old generation space) gradually increase > -- > > Key: KAFKA-5734 > URL: https://issues.apache.org/jira/browse/KAFKA-5734 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.10.2.0 > Environment: ubuntu 14.04 / java 1.7.0 >Reporter: jang >Priority: Major > Attachments: heap-log.xlsx, jconsole.png > > > I set up kafka server on ubuntu with 4GB ram. > Heap ( Old generation space ) size is increasing gradually like attached > excel file which recorded gc info in 1 minute interval. > Finally OU occupies 2.6GB and GC expend too much time ( And out of memory > exception ) > kafka process argumens are below. > _java -Xmx3000M -Xms2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC > -Djava.awt.headless=true > -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc > -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps > -Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false > -Dkafka.logs.dir=/usr/local/kafka/bin/../logs > -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-5734) Heap (Old generation space) gradually increase
[ https://issues.apache.org/jira/browse/KAFKA-5734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-5734: > Heap (Old generation space) gradually increase > -- > > Key: KAFKA-5734 > URL: https://issues.apache.org/jira/browse/KAFKA-5734 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.10.2.0 > Environment: ubuntu 14.04 / java 1.7.0 >Reporter: jang >Priority: Major > Attachments: heap-log.xlsx, jconsole.png > > > I set up kafka server on ubuntu with 4GB ram. > Heap ( Old generation space ) size is increasing gradually like attached > excel file which recorded gc info in 1 minute interval. > Finally OU occupies 2.6GB and GC expend too much time ( And out of memory > exception ) > kafka process argumens are below. > _java -Xmx3000M -Xms2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC > -Djava.awt.headless=true > -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc > -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps > -Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false > -Dkafka.logs.dir=/usr/local/kafka/bin/../logs > -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8622) Snappy Compression Not Working
[ https://issues.apache.org/jira/browse/KAFKA-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8622. Resolution: Fixed > Snappy Compression Not Working > -- > > Key: KAFKA-8622 > URL: https://issues.apache.org/jira/browse/KAFKA-8622 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 2.3.0, 2.2.1 >Reporter: Kunal Verma >Assignee: kaushik srinivas >Priority: Major > > I am trying to produce a message on the broker with compression enabled as > snappy. > Environment : > Brokers[Kafka-cluster] are hosted on Centos 7 > I have download the latest version (2.3.0 & 2.2.1) tar, extract it and moved > to /opt/kafka- > I have executed the broker with standard configuration. > In my producer service(written in java), I have enabled snappy compression. > props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); > > so while sending record on broker, I am getting following errors: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request > > While investing further at broker end I got following error in log > > logs/kafkaServer.out:java.lang.UnsatisfiedLinkError: > /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: > /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: > failed to map segment from shared object: Operation not permitted > -- > > [2019-07-02 15:29:43,399] ERROR [ReplicaManager broker=1] Error processing > append operation on partition test-bulk-1 (kafka.server.ReplicaManager) > java.lang.NoClassDefFoundError: Could not initialize class > org.xerial.snappy.Snappy > at > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435) > at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466) > at java.io.DataInputStream.readByte(DataInputStream.java:265) > at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168) > at > org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:293) > at > org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264) > at > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:569) > at > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:538) > at > org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:327) > at > scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:55) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:269) > at > kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:261) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:73) > at kafka.log.Log.liftedTree1$1(Log.scala:881) > at kafka.log.Log.$anonfun$append$2(Log.scala:868) > at kafka.log.Log.maybeHandleIOException(Log.scala:2065) > at kafka.log.Log.append(Log.scala:850) > at kafka.log.Log.appendAsLeader(Log.scala:819) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversa
[jira] [Reopened] (KAFKA-949) Integrate kafka into YARN
[ https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-949: --- > Integrate kafka into YARN > - > > Key: KAFKA-949 > URL: https://issues.apache.org/jira/browse/KAFKA-949 > Project: Kafka > Issue Type: New Feature > Components: contrib >Affects Versions: 0.8.0 > Environment: hadoop 2-0.X >Reporter: Kam Kasravi >Priority: Major > > kafka is being added to bigtop (BIGTOP-989). Having kafka services available > under YARN will enable a number of cluster operations for kafka that YARN > handles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8255) Replica fetcher thread exits with OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/KAFKA-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8255. Resolution: Fixed > Replica fetcher thread exits with OffsetOutOfRangeException > --- > > Key: KAFKA-8255 > URL: https://issues.apache.org/jira/browse/KAFKA-8255 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Colin McCabe >Priority: Major > > Replica fetcher threads can exits with OffsetOutOfRangeException when the log > start offset has advanced beyond the high water mark on the fetching broker. > Example stack trace: > {code} > org.apache.kafka.common.KafkaException: Error processing data for partition > __consumer_offsets-46 offset 18761 > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:335) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:294) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:294) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:293) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:293) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293) > at > kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:292) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 4808819 of partition __consumer_offsets-46 > since it is larger than the high watermark 18761 > [2019-04-16 14:16:42,257] INFO [ReplicaFetcher replicaId=1001, leaderId=1003, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > {code} > It seems that we should not terminate the replica fetcher thread in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-8622) Snappy Compression Not Working
[ https://issues.apache.org/jira/browse/KAFKA-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-8622: > Snappy Compression Not Working > -- > > Key: KAFKA-8622 > URL: https://issues.apache.org/jira/browse/KAFKA-8622 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 2.3.0, 2.2.1 >Reporter: Kunal Verma >Assignee: kaushik srinivas >Priority: Major > > I am trying to produce a message on the broker with compression enabled as > snappy. > Environment : > Brokers[Kafka-cluster] are hosted on Centos 7 > I have download the latest version (2.3.0 & 2.2.1) tar, extract it and moved > to /opt/kafka- > I have executed the broker with standard configuration. > In my producer service(written in java), I have enabled snappy compression. > props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); > > so while sending record on broker, I am getting following errors: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request > > While investing further at broker end I got following error in log > > logs/kafkaServer.out:java.lang.UnsatisfiedLinkError: > /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: > /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: > failed to map segment from shared object: Operation not permitted > -- > > [2019-07-02 15:29:43,399] ERROR [ReplicaManager broker=1] Error processing > append operation on partition test-bulk-1 (kafka.server.ReplicaManager) > java.lang.NoClassDefFoundError: Could not initialize class > org.xerial.snappy.Snappy > at > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435) > at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466) > at java.io.DataInputStream.readByte(DataInputStream.java:265) > at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168) > at > org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:293) > at > org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264) > at > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:569) > at > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:538) > at > org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:327) > at > scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:55) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:269) > at > kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:261) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:73) > at kafka.log.Log.liftedTree1$1(Log.scala:881) > at kafka.log.Log.$anonfun$append$2(Log.scala:868) > at kafka.log.Log.maybeHandleIOException(Log.scala:2065) > at kafka.log.Log.append(Log.scala:850) > at kafka.log.Log.appendAsLeader(Log.scala:819) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversable.map(Traversable.sc
[jira] [Resolved] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch
[ https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7698. Resolution: Fixed > Kafka Broker fail to start: ProducerFencedException thrown from > producerstatemanager.scala!checkProducerEpoch > -- > > Key: KAFKA-7698 > URL: https://issues.apache.org/jira/browse/KAFKA-7698 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Ming Liu >Priority: Major > Labels: easyfix > > During our operation of Kafka, we frequently saw this failure: > There was an error in one of the threads during logs loading: > org.apache.kafka.common.errors.ProducerFencedException: > {code:java} > [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] > Loading producer state from snapshot file > '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' > (kafka.log.ProducerStateManager) > [06:57:09,698] INFO [Log partition=interaction_events-127, > dir=/data/disk5/kafka] Completed load of log with 11 segments, log start > offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log) > [06:57:09,701] ERROR There was an error in one of the threads during logs > loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's > epoch is no longer valid. There is probably another producer with a newer > epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager) > [06:57:09,705] INFO [ProducerStateManager > partition=client_interaction_events_authorid_enrichment-20] Writing producer > snapshot at offset 92418754384 (kafka.log.ProducerStateManager) > [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer > startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 63 > (request epoch), 66 (server epoch) > {code:java} > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-12563) Something wrong with MM2 metrics
[ https://issues.apache.org/jira/browse/KAFKA-12563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-12563: - > Something wrong with MM2 metrics > > > Key: KAFKA-12563 > URL: https://issues.apache.org/jira/browse/KAFKA-12563 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bui Thanh MInh >Priority: Major > Attachments: Screen Shot 2021-03-26 at 12.10.12.png > > > The metric > _*`adt_2dc_c1_kafka_connect_mirror_source_connector_replication_latency_ms_avg`*_ > shows that value of latency is a very large number but the amount of > messages in two DC are the same. > View details in the attachment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12563) Something wrong with MM2 metrics
[ https://issues.apache.org/jira/browse/KAFKA-12563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-12563. - Resolution: Fixed > Something wrong with MM2 metrics > > > Key: KAFKA-12563 > URL: https://issues.apache.org/jira/browse/KAFKA-12563 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bui Thanh MInh >Priority: Major > Attachments: Screen Shot 2021-03-26 at 12.10.12.png > > > The metric > _*`adt_2dc_c1_kafka_connect_mirror_source_connector_replication_latency_ms_avg`*_ > shows that value of latency is a very large number but the amount of > messages in two DC are the same. > View details in the attachment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13372) failed authentication due to: SSL handshake failed
[ https://issues.apache.org/jira/browse/KAFKA-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13372. - Resolution: Fixed > failed authentication due to: SSL handshake failed > -- > > Key: KAFKA-13372 > URL: https://issues.apache.org/jira/browse/KAFKA-13372 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.2.2 >Reporter: Maria Isabel Florez Rodriguez >Priority: Major > > Hi everyone, > > I have the next issue about authentication SCRAM + SSL. I’m using the CLI and > this is the version of my client (./kafka_2.13-2.8.1/bin/kafka-topics.sh). In > this example I will talk about list topics, but another operations (consumer, > producer) failed too. > > > First, let me describe the current scenario: > > * I have 5 Kafka servers with > * kafka-broker-0.mydomain.com > * kafka-broker-1.mydomain.com > * kafka-broker-2.mydomain.com > * kafka-broker-3.mydomain.com > * kafka-broker-4.mydomain.com > > * I have a DNS principal configured with Round Robin to IPs broker: > * kafka-broker-princial.mydomain.com (Round Robin) > > I have configured for each broker the next listeners (I'm using 3 ports): > {quote}advertised.listeners=SASL_SSL://kafka-broker-0.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-0.mydomain.com:9093,PLAINTEXT://kafka-broker-0.mydomain.com:9092{quote} > * 9092 for PLAINTEXT > * 9093 for SASL_PLAINTEXT > * 9094 for SASL_SSL > > My Kafka broker servers have the next config server.properties: > {quote}advertised.listeners=SASL_SSL://kafka-broker-X.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-X.mydomain.com:9093,PLAINTEXT://kafka-broker-X.mydomain.com:9092 > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer > auto.create.topics.enable=false > auto.leader.rebalance.enable=true > background.threads=10 > broker.id=X > broker.rack=us-east-1c > compression.type=producer > connections.max.idle.ms=270 > controlled.shutdown.enable=true > delete.topic.enable=true > host.name=localhost > leader.imbalance.check.interval.seconds=300 > leader.imbalance.per.broker.percentage=10 > listeners=SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9093,PLAINTEXT://0.0.0.0:9092 > log.cleaner.enable=true > log.dirs=/var/lib/kafka/log/data1,/var/lib/kafka/log/data2,/var/lib/kafka/log/data3 > log.retention.check.interval.ms=30 > log.retention.hours=336 > log.segment.bytes=1073741824 > message.max.bytes=112 > min.insync.replicas=2 > num.io.threads=8 > num.network.threads=3 > num.partitions=3 > num.recovery.threads.per.data.dir=1 > num.replica.fetchers=1 > offset.metadata.max.bytes=4096 > offsets.commit.timeout.ms=5000 > offsets.retention.minutes=129600 > offsets.topic.num.partitions=50 > offsets.topic.replication.factor=3 > port=9092 > queued.max.requests=500 > replica.fetch.min.bytes=1 > replica.fetch.wait.max.ms=500 > sasl.enabled.mechanisms=SCRAM-SHA-256,GSSAPI > sasl.kerberos.service.name=x > sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 > security.inter.broker.protocol=SASL_SSL > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > ssl.client.auth=required > {{ssl.endpoint.identification.algorithm=""}} > ssl.enabled.protocols=TLSv1.2 > ssl.key.password= > ssl.keystore.location=/etc/ssl/default_keystore.jks > ssl.keystore.password= > ssl.truststore.location=/usr/lib/jvm/java-11-adoptopenjdk-hotspot/lib/security/cacerts > ssl.truststore.password= > ssl.truststore.type=JKS > super.users=User:x > zookeeper.connect=kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com > :2181,kafka-zk-X.mydomain.com:218/my-environment > zookeeper.connection.timeout.ms=6000 > zookeeper.sasl.client=false{quote} > > > I was trying the next things: > > * (/)*PLAINTEXT:* I can consume directly to broker to broker with port > *9092* (Using IP or dns broker) > * (/)*PLAINTEXT:* I also can consume directly to DNS principal configured > with Round Robin with port *9092* (Using DNS principal) > * (/)*SASL_SSL:* I can consume directly to broker to broker with port *9094* > (Using only dns broker due it needs to validate the certificate) > * (x)*SASL_SSL:* I cannot consume directly to DNS principal configured with > Round Robin with port *9094* > The issue is: * *(x)SASL_SSL(x):* I cannot consume directly to DNS principal > configured with Round Robin with port *9094*. Only I have the issue with I > try to connect directly to DNS principal. My certificates contains > permissions with all my subdomains under the domain. > * I have the next _file.config_ when that I use when I try to connect to > DNS principal. (Is the same file that I used for consume directly to broker >
[jira] [Reopened] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed
[ https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-4329: > The order of the parameters for creating the ZkUtils object is reversed > --- > > Key: KAFKA-4329 > URL: https://issues.apache.org/jira/browse/KAFKA-4329 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang >Priority: Critical > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problems, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
[ https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-10352: - > Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel) > - > > Key: KAFKA-10352 > URL: https://issues.apache.org/jira/browse/KAFKA-10352 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Seongbae Chang >Priority: Critical > > One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. > And then, other brokers also was shut down because of similar causes. > > Main cause of this problem is '*Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)* > *java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint*' > > I haven't known why this error occurs and how to solve it. Please, give me > some answers or comments about it. Thank you. > And I attached the content of log files such as kafkaServer.out, > log-cleaner.log > > kafkaServer.out > {code:java} > [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO > [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 > 19:56:48,080] ERROR Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) > at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) > at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at > scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at > scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at > scala.collection.immutable.List$.from(List.scala:617) at > scala.collection.immutable.List$.from(List.scala:611) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.immutable.Iterable$.from(Iterable.scala:35) at > scala.collection.immutable.Iterable$.from(Iterable.scala:32) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.IterableOps.flatMap(Iterable.scala:674) at > scala.collection.IterableOps.flatMap$(Iterable.scala:674) at > scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132) > at > kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171) > at > kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 > 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir > /tmp/kafka-logs (kafka.server.ReplicaManager)[2020-07-30 19:56:48,086] INFO > [ReplicaFetcherManager on broker 3] Removed fetcher for partitions > HashSet(__consumer_offsets-8, sbchang.test.partition-0, > __consumer_offsets-47, sbchang.test.partition-2, sbchang.test.header-2, > configtest-0, __ispossible-0, __consumer_offsets-32, __consumer_offsets-35, > temp-iot-0, __consum
[jira] [Resolved] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
[ https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-10352. - Resolution: Fixed > Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel) > - > > Key: KAFKA-10352 > URL: https://issues.apache.org/jira/browse/KAFKA-10352 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Seongbae Chang >Priority: Critical > > One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. > And then, other brokers also was shut down because of similar causes. > > Main cause of this problem is '*Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)* > *java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint*' > > I haven't known why this error occurs and how to solve it. Please, give me > some answers or comments about it. Thank you. > And I attached the content of log files such as kafkaServer.out, > log-cleaner.log > > kafkaServer.out > {code:java} > [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO > [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 > 19:56:48,080] ERROR Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) > at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) > at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at > scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at > scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at > scala.collection.immutable.List$.from(List.scala:617) at > scala.collection.immutable.List$.from(List.scala:611) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.immutable.Iterable$.from(Iterable.scala:35) at > scala.collection.immutable.Iterable$.from(Iterable.scala:32) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.IterableOps.flatMap(Iterable.scala:674) at > scala.collection.IterableOps.flatMap$(Iterable.scala:674) at > scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132) > at > kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171) > at > kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 > 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir > /tmp/kafka-logs (kafka.server.ReplicaManager)[2020-07-30 19:56:48,086] INFO > [ReplicaFetcherManager on broker 3] Removed fetcher for partitions > HashSet(__consumer_offsets-8, sbchang.test.partition-0, > __consumer_offsets-47, sbchang.test.partition-2, sbchang.test.header-2, > configtest-0, __ispossible-0, __consumer_offsets-32, __consumer_offsets-35,
[jira] [Resolved] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
[ https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14398. - Resolution: Fixed > Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers > > > Key: KAFKA-14398 > URL: https://issues.apache.org/jira/browse/KAFKA-14398 > Project: Kafka > Issue Type: Improvement > Components: kraft, unit tests >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > Fix For: 3.4.0 > > > KRAFT is a replacement for ZK for storing metadata. > We should validate that ACLs work with KRAFT for the supported authentication > mechanizms. > I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed
[ https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-4329. Resolution: Fixed > The order of the parameters for creating the ZkUtils object is reversed > --- > > Key: KAFKA-4329 > URL: https://issues.apache.org/jira/browse/KAFKA-4329 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang >Priority: Critical > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problems, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
[ https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-14398: - > Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers > > > Key: KAFKA-14398 > URL: https://issues.apache.org/jira/browse/KAFKA-14398 > Project: Kafka > Issue Type: Improvement > Components: kraft, unit tests >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > Fix For: 3.4.0 > > > KRAFT is a replacement for ZK for storing metadata. > We should validate that ACLs work with KRAFT for the supported authentication > mechanizms. > I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13372) failed authentication due to: SSL handshake failed
[ https://issues.apache.org/jira/browse/KAFKA-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-13372: - > failed authentication due to: SSL handshake failed > -- > > Key: KAFKA-13372 > URL: https://issues.apache.org/jira/browse/KAFKA-13372 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.2.2 >Reporter: Maria Isabel Florez Rodriguez >Priority: Major > > Hi everyone, > > I have the next issue about authentication SCRAM + SSL. I’m using the CLI and > this is the version of my client (./kafka_2.13-2.8.1/bin/kafka-topics.sh). In > this example I will talk about list topics, but another operations (consumer, > producer) failed too. > > > First, let me describe the current scenario: > > * I have 5 Kafka servers with > * kafka-broker-0.mydomain.com > * kafka-broker-1.mydomain.com > * kafka-broker-2.mydomain.com > * kafka-broker-3.mydomain.com > * kafka-broker-4.mydomain.com > > * I have a DNS principal configured with Round Robin to IPs broker: > * kafka-broker-princial.mydomain.com (Round Robin) > > I have configured for each broker the next listeners (I'm using 3 ports): > {quote}advertised.listeners=SASL_SSL://kafka-broker-0.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-0.mydomain.com:9093,PLAINTEXT://kafka-broker-0.mydomain.com:9092{quote} > * 9092 for PLAINTEXT > * 9093 for SASL_PLAINTEXT > * 9094 for SASL_SSL > > My Kafka broker servers have the next config server.properties: > {quote}advertised.listeners=SASL_SSL://kafka-broker-X.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-X.mydomain.com:9093,PLAINTEXT://kafka-broker-X.mydomain.com:9092 > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer > auto.create.topics.enable=false > auto.leader.rebalance.enable=true > background.threads=10 > broker.id=X > broker.rack=us-east-1c > compression.type=producer > connections.max.idle.ms=270 > controlled.shutdown.enable=true > delete.topic.enable=true > host.name=localhost > leader.imbalance.check.interval.seconds=300 > leader.imbalance.per.broker.percentage=10 > listeners=SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9093,PLAINTEXT://0.0.0.0:9092 > log.cleaner.enable=true > log.dirs=/var/lib/kafka/log/data1,/var/lib/kafka/log/data2,/var/lib/kafka/log/data3 > log.retention.check.interval.ms=30 > log.retention.hours=336 > log.segment.bytes=1073741824 > message.max.bytes=112 > min.insync.replicas=2 > num.io.threads=8 > num.network.threads=3 > num.partitions=3 > num.recovery.threads.per.data.dir=1 > num.replica.fetchers=1 > offset.metadata.max.bytes=4096 > offsets.commit.timeout.ms=5000 > offsets.retention.minutes=129600 > offsets.topic.num.partitions=50 > offsets.topic.replication.factor=3 > port=9092 > queued.max.requests=500 > replica.fetch.min.bytes=1 > replica.fetch.wait.max.ms=500 > sasl.enabled.mechanisms=SCRAM-SHA-256,GSSAPI > sasl.kerberos.service.name=x > sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 > security.inter.broker.protocol=SASL_SSL > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > ssl.client.auth=required > {{ssl.endpoint.identification.algorithm=""}} > ssl.enabled.protocols=TLSv1.2 > ssl.key.password= > ssl.keystore.location=/etc/ssl/default_keystore.jks > ssl.keystore.password= > ssl.truststore.location=/usr/lib/jvm/java-11-adoptopenjdk-hotspot/lib/security/cacerts > ssl.truststore.password= > ssl.truststore.type=JKS > super.users=User:x > zookeeper.connect=kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com > :2181,kafka-zk-X.mydomain.com:218/my-environment > zookeeper.connection.timeout.ms=6000 > zookeeper.sasl.client=false{quote} > > > I was trying the next things: > > * (/)*PLAINTEXT:* I can consume directly to broker to broker with port > *9092* (Using IP or dns broker) > * (/)*PLAINTEXT:* I also can consume directly to DNS principal configured > with Round Robin with port *9092* (Using DNS principal) > * (/)*SASL_SSL:* I can consume directly to broker to broker with port *9094* > (Using only dns broker due it needs to validate the certificate) > * (x)*SASL_SSL:* I cannot consume directly to DNS principal configured with > Round Robin with port *9094* > The issue is: * *(x)SASL_SSL(x):* I cannot consume directly to DNS principal > configured with Round Robin with port *9094*. Only I have the issue with I > try to connect directly to DNS principal. My certificates contains > permissions with all my subdomains under the domain. > * I have the next _file.config_ when that I use when I try to connect to > DNS principal. (Is the same file that I used for consume directly to broker > to broker with port 9
[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-10704. - Resolution: Fixed > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This m
[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-10704: - > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This message was sent by Atl
[jira] [Assigned] (KAFKA-5870) Idempotent producer: a producerId reset causes undesirable behavior for inflight batches to other partitions
[ https://issues.apache.org/jira/browse/KAFKA-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-5870: -- Assignee: (was: Apurva Mehta) > Idempotent producer: a producerId reset causes undesirable behavior for > inflight batches to other partitions > > > Key: KAFKA-5870 > URL: https://issues.apache.org/jira/browse/KAFKA-5870 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Priority: Major > Labels: exactly-once > > Currently, if we have to reset the producer id for any reason (for instance > if batches to a partition get expired, if we get an > {{OutOfOrderSequenceException}}, etc) we could cause batches to other > --healthy-- partitions to fail with a spurious > {{OutOfOrderSequenceException}}. > This is detailed in this PR discussion: > https://github.com/apache/kafka/pull/3743#discussion_r137907630 > Ideally, we would want all inflight batches to be handled to completion > rather than potentially failing them prematurely. Further, since we want to > tighten up the semantics of the {{OutOfOrderSequenceException}}, at the very > least we should raise another exception in this case, because there is no > data loss on the broker when the client gives up. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-5543) We don't remove the LastStableOffsetLag metric when a partition is moved away from a broker
[ https://issues.apache.org/jira/browse/KAFKA-5543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-5543: -- Assignee: (was: Apurva Mehta) > We don't remove the LastStableOffsetLag metric when a partition is moved away > from a broker > --- > > Key: KAFKA-5543 > URL: https://issues.apache.org/jira/browse/KAFKA-5543 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Priority: Major > > Reported by [~junrao], we have a small leak where the `LastStableOffsetLag` > metric is not removed along with the other metrics in the > `Partition.removeMetrics` method. This could create a leak when partitions > are reassigned or a topic is deleted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients
[ https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-2939: -- Assignee: (was: Mani Jindal) > Make AbstractConfig.logUnused() tunable for clients > --- > > Key: KAFKA-2939 > URL: https://issues.apache.org/jira/browse/KAFKA-2939 > Project: Kafka > Issue Type: Improvement > Components: config >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > > Today we always log unused configs in KafkaProducer / KafkaConsumer in their > constructors, however for some cases like Kafka Streams that make use of > these clients, other configs may be passed in to configure Partitioner / > Serializer classes, etc. So it would be better to make this function call > optional to avoid printing unnecessary and confusing WARN entries. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-5053) Send Error messages along with FindCoordinatorResponse
[ https://issues.apache.org/jira/browse/KAFKA-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-5053: -- Assignee: (was: Apurva Mehta) > Send Error messages along with FindCoordinatorResponse > -- > > Key: KAFKA-5053 > URL: https://issues.apache.org/jira/browse/KAFKA-5053 > Project: Kafka > Issue Type: Improvement >Reporter: Apurva Mehta >Priority: Major > > As mentioned in > https://github.com/apache/kafka/pull/2825#discussion_r110535855, currently it > is hard to debug issues when we get an error in the > `FindCoordinatorResponse`. It would help if the server populated the response > with a suitable message. > With the transaction coordinator, this needs becomes more acute, so we should > try to emulate the CreateTopicsResponse and return a usable message along > with the error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1117568502 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; Review Comment: actually I remember why I didn't initially put it in the storage package. it seems like it wouldn't be a simple change to move this to there because the TSM relies on the PartitionFetchState case class in AbstractFetcherThread.scala. it would've created a circular dependency across modules that I wanted to avoid. @junrao wdyt about keeping it in the core module in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1117540370 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ## @@ -863,45 +864,15 @@ public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor, ); } -public static void maybeMeasureLatency(final Runnable actionToMeasure, Review Comment: Moved these util functions to `AbstractTask` since we need them for both standby tasks and active tasks now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14758) Extract inner classes from Fetcher for reuse in refactoring
Kirk True created KAFKA-14758: - Summary: Extract inner classes from Fetcher for reuse in refactoring Key: KAFKA-14758 URL: https://issues.apache.org/jira/browse/KAFKA-14758 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Kirk True Assignee: Kirk True The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task includes refactoring {{Fetcher}} by extracting out the inner classes into top-level (though still in {{{}internal{}}}) so that those classes can be referenced by forthcoming refactored fetch logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14717) KafkaStreams can' get running if the rebalance happens before StreamThread gets shutdown completely
[ https://issues.apache.org/jira/browse/KAFKA-14717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14717. - Resolution: Fixed > KafkaStreams can' get running if the rebalance happens before StreamThread > gets shutdown completely > --- > > Key: KAFKA-14717 > URL: https://issues.apache.org/jira/browse/KAFKA-14717 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.5.0 > > > I noticed this issue when tracing KAFKA-7109 > StreamThread closes the consumer before changing state to DEAD. If the > partition rebalance happens quickly, the other StreamThreads can't change > KafkaStream state from REBALANCING to RUNNING since there is a > PENDING_SHUTDOWN StreamThread -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1117540370 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ## @@ -863,45 +864,15 @@ public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor, ); } -public static void maybeMeasureLatency(final Runnable actionToMeasure, Review Comment: Moved these util functions to `AbstractTask` since we need them for both standby tasks and active tasks now. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -986,6 +987,11 @@ private void prepareChangelogs(final Set newPartitionsToResto } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + +final TaskId taskId = changelogs.get(partition).stateManager.taskId(); +final StreamTask task = (StreamTask) tasks.get(taskId); +final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); +task.recordRestoreRemaining(time, recordsToRestore); Review Comment: The logic for measuring remaining records is a bit complex: we first aggregate the total amount of records to restore across all changelog partitions at the beginning when initializing the changelogs; and then during restoration we keep decrementing by the number of restored records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart
[ https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14442. - Resolution: Fixed > GlobalKTable restoration waits requestTimeout during application restart > > > Key: KAFKA-14442 > URL: https://issues.apache.org/jira/browse/KAFKA-14442 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Gergo L��p >Priority: Major > Fix For: 3.2.0 > > > Using "exactly_once_beta" the highWatermark "skips" an offset after a > transaction but in this case the global .checkpoint file contains different > value (smaller by 1) than the highWatermark. > During restoration because of the difference between the checkpoint and > highWatermark a poll will be attempted but sometimes there is no new record > on the partition and the GlobalStreamThread has to wait for the > requestTimeout to continue. > If there is any new record on the partition the problem does not occure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart
[ https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-14442: - > GlobalKTable restoration waits requestTimeout during application restart > > > Key: KAFKA-14442 > URL: https://issues.apache.org/jira/browse/KAFKA-14442 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Gergo L��p >Priority: Major > Fix For: 3.2.0 > > > Using "exactly_once_beta" the highWatermark "skips" an offset after a > transaction but in this case the global .checkpoint file contains different > value (smaller by 1) than the highWatermark. > During restoration because of the difference between the checkpoint and > highWatermark a poll will be attempted but sometimes there is no new record > on the partition and the GlobalStreamThread has to wait for the > requestTimeout to continue. > If there is any new record on the partition the problem does not occure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException
[ https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-9250: > Kafka streams stuck in rebalancing state after UnknownHostException > --- > > Key: KAFKA-9250 > URL: https://issues.apache.org/jira/browse/KAFKA-9250 > Project: Kafka > Issue Type: Bug > Components: clients, config, network, streams >Affects Versions: 2.2.0 >Reporter: Vijay >Priority: Critical > > We are using kafka streams (2.2.0) application for reading messages from > source topic do some transformation and send to destination topic. > Application started fine and processed messages till it encountered > UnknownHostException, after which application is hung in rebalancing state > and not processing messages. > > Below are the properties we have configured : > application.id = * > bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3" > num.stream.threads=3 > replication.factor=3 > num.standby.replicas=1 > max.block.ms=8640 > acks=all > auto.offset.reset=earliest > processing.guarantee=exactly_once > > Additional details. > Number of brokers - 3 > Source topic partition count - 12 and replication factor of 3 > Destination topic partition count - 12 and replication factor of 3 > 4 instances of stream application are deployed in docker containers. > > Below are the some of the logs : > {noformat} > [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] > o.a.k.clients.NetworkClient - [Consumer > clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer, > groupId=streams-example] Error connecting to node hostname1:port1 (id: > 2147438464 rack: null) > java.net.UnknownHostException: hostname1 > at java.net.InetAddress.getAllByName0(InetAddress.java:1280) > at java.net.InetAddress.getAllByName(InetAddress.java:1192) > at java.net.InetAddress.getAllByName(InetAddress.java:1126) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387) > > at > org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121) > > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917) > > at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774){noformat} > > > > {noformat} > [ERROR] [kafka-producer-netwo
[jira] [Resolved] (KAFKA-4706) Unify StreamsKafkaClient instances
[ https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-4706. Resolution: Fixed > Unify StreamsKafkaClient instances > -- > > Key: KAFKA-4706 > URL: https://issues.apache.org/jira/browse/KAFKA-4706 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Assignee: Sharad >Priority: Minor > Labels: beginner, easyfix, newbie > Fix For: 1.1.0 > > > Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in > {{KafkaStreams}} and one in {{InternalTopicManager}}). > We want to unify both such that only a single instance is used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-3576) Unify KStream and KTable API
[ https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-3576: > Unify KStream and KTable API > > > Key: KAFKA-3576 > URL: https://issues.apache.org/jira/browse/KAFKA-3576 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Damian Guy >Priority: Major > Labels: api > Fix For: 0.10.1.0 > > > For KTable aggregations, it has a pattern of > {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an > inner topic based on the selected key in {{groupBy(...)}}. > For KStream aggregations, though, it has a pattern of > {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, > users need to manually use a topic to repartition data, and the syntax is a > bit different with KTable as well. > h2. Goal > To have similar APIs for aggregations of KStream and KTable -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-3576) Unify KStream and KTable API
[ https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-3576. Resolution: Fixed > Unify KStream and KTable API > > > Key: KAFKA-3576 > URL: https://issues.apache.org/jira/browse/KAFKA-3576 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Damian Guy >Priority: Major > Labels: api > Fix For: 0.10.1.0 > > > For KTable aggregations, it has a pattern of > {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an > inner topic based on the selected key in {{groupBy(...)}}. > For KStream aggregations, though, it has a pattern of > {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, > users need to manually use a topic to repartition data, and the syntax is a > bit different with KTable as well. > h2. Goal > To have similar APIs for aggregations of KStream and KTable -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3
[ https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13599. - Resolution: Fixed > Upgrade RocksDB to 6.27.3 > - > > Key: KAFKA-13599 > URL: https://issues.apache.org/jira/browse/KAFKA-13599 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Jonathan Albrecht >Assignee: Jonathan Albrecht >Priority: Major > Fix For: 3.2.0 > > Attachments: compat_report.html > > > RocksDB v6.27.3 has been released and it is the first release to support > s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle > without s390x support. > RocksDB v6.27.3 has added some new options that require an update to > streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java > but no other changes are needed to upgrade. > A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-4706) Unify StreamsKafkaClient instances
[ https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-4706: > Unify StreamsKafkaClient instances > -- > > Key: KAFKA-4706 > URL: https://issues.apache.org/jira/browse/KAFKA-4706 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Assignee: Sharad >Priority: Minor > Labels: beginner, easyfix, newbie > Fix For: 1.1.0 > > > Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in > {{KafkaStreams}} and one in {{InternalTopicManager}}). > We want to unify both such that only a single instance is used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13599) Upgrade RocksDB to 6.27.3
[ https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-13599: - > Upgrade RocksDB to 6.27.3 > - > > Key: KAFKA-13599 > URL: https://issues.apache.org/jira/browse/KAFKA-13599 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Jonathan Albrecht >Assignee: Jonathan Albrecht >Priority: Major > Fix For: 3.2.0 > > Attachments: compat_report.html > > > RocksDB v6.27.3 has been released and it is the first release to support > s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle > without s390x support. > RocksDB v6.27.3 has added some new options that require an update to > streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java > but no other changes are needed to upgrade. > A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException
[ https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9250. Resolution: Fixed > Kafka streams stuck in rebalancing state after UnknownHostException > --- > > Key: KAFKA-9250 > URL: https://issues.apache.org/jira/browse/KAFKA-9250 > Project: Kafka > Issue Type: Bug > Components: clients, config, network, streams >Affects Versions: 2.2.0 >Reporter: Vijay >Priority: Critical > > We are using kafka streams (2.2.0) application for reading messages from > source topic do some transformation and send to destination topic. > Application started fine and processed messages till it encountered > UnknownHostException, after which application is hung in rebalancing state > and not processing messages. > > Below are the properties we have configured : > application.id = * > bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3" > num.stream.threads=3 > replication.factor=3 > num.standby.replicas=1 > max.block.ms=8640 > acks=all > auto.offset.reset=earliest > processing.guarantee=exactly_once > > Additional details. > Number of brokers - 3 > Source topic partition count - 12 and replication factor of 3 > Destination topic partition count - 12 and replication factor of 3 > 4 instances of stream application are deployed in docker containers. > > Below are the some of the logs : > {noformat} > [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] > o.a.k.clients.NetworkClient - [Consumer > clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer, > groupId=streams-example] Error connecting to node hostname1:port1 (id: > 2147438464 rack: null) > java.net.UnknownHostException: hostname1 > at java.net.InetAddress.getAllByName0(InetAddress.java:1280) > at java.net.InetAddress.getAllByName(InetAddress.java:1192) > at java.net.InetAddress.getAllByName(InetAddress.java:1126) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387) > > at > org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121) > > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917) > > at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774){noformat} > > > > {noformat} > [ERROR]
[GitHub] [kafka] guozhangwang opened a new pull request, #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang opened a new pull request, #13300: URL: https://github.com/apache/kafka/pull/13300 1. Added task-level metrics: * restore-rate, restore-total (active) * update-rate, update-total (standby) * restore-remaining-records-total (active) 2. Fixed some naming confusions in the XXXMetrics classes, especially for distinguishing sensor name constructs and metric name constructs. 3. Add related unit tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14757) Kafka Cooperative Sticky Assignor results in significant duplicate consumption
[ https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693323#comment-17693323 ] Philip Nee commented on KAFKA-14757: I mean, the point of onPartitionsRevoked is to ensure progress is saved upon partition change. So, if you decide to manage the offset yourself, you should always try to implement the onPartitionRevoked. > Kafka Cooperative Sticky Assignor results in significant duplicate consumption > -- > > Key: KAFKA-14757 > URL: https://issues.apache.org/jira/browse/KAFKA-14757 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.1.1 > Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in > Spring Boot consumers. >Reporter: Siddharth Anand >Priority: Critical > > Details may be found within the linked document: > [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | > [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]] > In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in > significant duplicate message consumption. During last year's F1 Grand Prix > events and World Cup soccer events, our company's Kafka-based platform > received live-traffic. This live traffic, coupled with autoscaled consumers > resulted in as much as 70% duplicate message consumption at the Kafka > consumers. > In December 2022, we ran a synthetic load test to confirm that duplicate > message consumption occurs during consumer scale out/in and Kafka partition > rebalancing when using the Cooperative Sticky Assignor. This issue does not > occur when using the Range Assignor. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1444268607 Just a bit a note here on this PR: Seems like we need to be more deliberate at handling the timeout, because the non-retriable errors are always expected to be thrown. (except for the 4 cases), which is why the change triggered 60-ish breaking tests. Updating the PR to retrigger the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs
gharris1727 commented on code in PR #13291: URL: https://github.com/apache/kafka/pull/13291#discussion_r1117488058 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ## @@ -266,6 +267,7 @@ public void testPollBoundary() throws Exception { props.put(NAME_CONFIG, CONNECTOR_NAME); props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString()); props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); +props.put(THROUGHPUT_CONFIG, Integer.toString(recordsProduced)); Review Comment: Since the result of Integer.toString(100) and Long.toString(100L) are the same, I don't think this necessary. The reason I re-used the same variable was because I wanted to keep the runtime of the test constant. If there were two variables, someone could tune one while holding the other constant until the test timed out. I agree that `recordsProduced` is a poor name, because this test produces many more records than that under normal conditions. Do you have a better name in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693312#comment-17693312 ] Matthias J. Sax commented on KAFKA-14747: - Talking to [~guozhang] it seem we don't need a KIP :) – I removed the label. > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Koma Zhang >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14747: Labels: beginner newbie (was: beginner needs-kip newbie) > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Koma Zhang >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs
gharris1727 commented on code in PR #13291: URL: https://github.com/apache/kafka/pull/13291#discussion_r1117479545 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ## @@ -81,6 +81,7 @@ import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG; +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.THROUGHPUT_CONFIG; Review Comment: I think that would make sense if the underlying configuration was `throughput.msgs.per.sec` but it is currently `throughput`. I preferred to keep the existing name instead of renaming + aliasing the configuration. just to keep this PR small. Do you think renaming the configuration is important here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org