(kafka) branch test-fix-dedicated-mirror-integration-test-multi-node-cluster created (now f8418d4fdea)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch test-fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git at f8418d4fdea Repeat testMultiNodeCluster case 25 times This branch includes the following new commits: new f8418d4fdea Repeat testMultiNodeCluster case 25 times The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka) 01/01: Repeat testMultiNodeCluster case 25 times
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch test-fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git commit f8418d4fdea1b989356b74e7e73462ecfa9cd8aa Author: Chris Egerton AuthorDate: Fri Jul 5 20:31:01 2024 -0400 Repeat testMultiNodeCluster case 25 times --- .../connect/mirror/integration/DedicatedMirrorIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index 0a838632460..49ee36a8fcf 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.test.NoRetryException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -194,7 +195,7 @@ public class DedicatedMirrorIntegrationTest { * See https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters;>KIP-710 * for more detail on the necessity for this test case. */ -@Test +@RepeatedTest(25) public void testMultiNodeCluster() throws Exception { Properties brokerProps = new Properties(); brokerProps.put("transaction.state.log.replication.factor", "1");
(kafka) branch fix-dedicated-mirror-integration-test-multi-node-cluster updated (fd520ef3e02 -> 82d48bd7f72)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git discard fd520ef3e02 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations add 82d48bd7f72 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (fd520ef3e02) \ N -- N -- N refs/heads/fix-dedicated-mirror-integration-test-multi-node-cluster (82d48bd7f72) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java | 1 + 1 file changed, 1 insertion(+)
(kafka) branch fix-dedicated-mirror-integration-test-multi-node-cluster updated (e4f483232d6 -> fd520ef3e02)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git discard e4f483232d6 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations omit 27b0958466e fix CI compile error add 72a47cc07b6 Fix compilation error (#16526) add fd520ef3e02 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (e4f483232d6) \ N -- N -- N refs/heads/fix-dedicated-mirror-integration-test-multi-node-cluster (fd520ef3e02) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes:
(kafka) branch trunk updated: Fix compilation error (#16526)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 72a47cc07b6 Fix compilation error (#16526) 72a47cc07b6 is described below commit 72a47cc07b666bc168b390242d5301bb5d5c33cb Author: TaiJuWu AuthorDate: Fri Jul 5 04:48:55 2024 +0800 Fix compilation error (#16526) Reviewers: Chia-Ping Tsai , Chris Egerton --- .../kafka/coordinator/group/assignor/SimpleAssignorTest.java | 9 + 1 file changed, 9 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java index f15bde38469..0553db6a487 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java @@ -97,6 +97,7 @@ public class SimpleAssignorTest { Map members = Collections.singletonMap( MEMBER_A, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -134,6 +135,7 @@ public class SimpleAssignorTest { Map members = Collections.singletonMap( MEMBER_A, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_2_UUID), Assignment.EMPTY @@ -169,12 +171,14 @@ public class SimpleAssignorTest { Map members = new TreeMap<>(); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_1_UUID, TOPIC_3_UUID), Assignment.EMPTY )); members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_1_UUID, TOPIC_3_UUID), Assignment.EMPTY @@ -230,12 +234,14 @@ public class SimpleAssignorTest { Map members = new TreeMap<>(); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_1_UUID, TOPIC_2_UUID), Assignment.EMPTY )); members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_3_UUID), Assignment.EMPTY @@ -243,6 +249,7 @@ public class SimpleAssignorTest { String memberC = "C"; members.put(memberC, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_2_UUID, TOPIC_3_UUID), Assignment.EMPTY @@ -295,12 +302,14 @@ public class SimpleAssignorTest { Map members = new TreeMap<>(); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), mkSet(TOPIC_1_UUID, TOPIC_2_UUID), Assignment.EMPTY )); members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( +Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY
(kafka) branch fix-dedicated-mirror-integration-test-multi-node-cluster updated (a87223e0e5b -> e4f483232d6)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git discard a87223e0e5b MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations add 27b0958466e fix CI compile error add e4f483232d6 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (a87223e0e5b) \ N -- N -- N refs/heads/fix-dedicated-mirror-integration-test-multi-node-cluster (e4f483232d6) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../kafka/coordinator/group/assignor/SimpleAssignorTest.java | 9 + 1 file changed, 9 insertions(+)
(kafka) 01/01: MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git commit a87223e0e5bf7289093de0c0bbec476e91c8dae2 Author: Chris Egerton AuthorDate: Thu Jul 4 15:11:06 2024 -0400 MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations This test is failing very frequently because our retry logic doesn't handle `RebalanceNeededException` instances that are wrapped in an `ExecutionException`; see [Gradle Enterprise](https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=1720120130872=171946080=America%2FNew_York=org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest=testMultiNodeCluster()). --- .../connect/mirror/integration/DedicatedMirrorIntegrationTest.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index 6e35e962ed9..df830027702 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -354,8 +354,10 @@ public class DedicatedMirrorIntegrationTest { .map(TaskInfo::config) .allMatch(predicate); } catch (Exception ex) { -if (ex instanceof RebalanceNeededException) { -// RebalanceNeededException should be retry-able. +boolean retriable = (ex instanceof RebalanceNeededException) +|| ((ex instanceof ExecutionException) && (ex.getCause() instanceof RebalanceNeededException)); +if (retriable) { +// RebalanceNeededException should be retriable // This happens when a worker has read a new config from the config topic, but hasn't completed the // subsequent rebalance yet throw ex;
(kafka) branch fix-dedicated-mirror-integration-test-multi-node-cluster created (now a87223e0e5b)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch fix-dedicated-mirror-integration-test-multi-node-cluster in repository https://gitbox.apache.org/repos/asf/kafka.git at a87223e0e5b MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations This branch includes the following new commits: new a87223e0e5b MINOR: Fix retry logic in DedicatedMirrorIntegrationTest::awaitTaskConfigurations The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka) branch trunk updated (4550550c7cb -> 27220d146c5)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 4550550c7cb MINOR: Cleanup TestPlugins and normalize TestPlugin enum (#1) add 27220d146c5 KAFKA-10816: Add health check endpoint for Kafka Connect (#16477) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/common/utils/Time.java | 4 +- .../kafka/connect/cli/AbstractConnectCli.java | 23 ++-- .../kafka/connect/cli/ConnectDistributed.java | 5 +- .../kafka/connect/cli/ConnectStandalone.java | 9 +- .../kafka/connect/runtime/AbstractHerder.java | 10 +- .../org/apache/kafka/connect/runtime/Connect.java | 14 +-- .../org/apache/kafka/connect/runtime/Herder.java | 13 +- .../runtime/distributed/DistributedHerder.java | 24 ++-- .../connect/runtime/rest/HerderRequestHandler.java | 13 +- .../connect/runtime/rest/RestRequestTimeout.java | 19 +++ .../kafka/connect/runtime/rest/RestServer.java | 21 +++- .../runtime/rest/entities/WorkerStatus.java| 99 +++ .../runtime/rest/resources/RootResource.java | 58 - .../runtime/standalone/HealthCheckThread.java | 136 + .../runtime/standalone/StandaloneHerder.java | 17 ++- .../java/org/apache/kafka/connect/util/Stage.java | 16 +++ .../connect/integration/BlockingConnectorTest.java | 7 +- .../integration/ConnectWorkerIntegrationTest.java | 50 +++- .../integration/InternalTopicsIntegrationTest.java | 37 -- .../SourceConnectorsIntegrationTest.java | 2 +- .../StandaloneWorkerIntegrationTest.java | 133 +++- .../resources/ConnectorPluginsResourceTest.java| 8 +- .../rest/resources/ConnectorsResourceTest.java | 8 +- .../resources/InternalConnectResourceTest.java | 7 +- .../runtime/rest/resources/RootResourceTest.java | 115 - .../connect/util/clusters/ConnectAssertions.java | 2 +- .../connect/util/clusters/EmbeddedConnect.java | 60 + .../util/clusters/EmbeddedConnectCluster.java | 8 +- .../util/clusters/EmbeddedConnectStandalone.java | 51 +++- .../util/clusters/EmbeddedKafkaCluster.java| 54 +--- .../kafka/connect/util/clusters/WorkerHandle.java | 44 +++ 31 files changed, 911 insertions(+), 156 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/WorkerStatus.java create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/HealthCheckThread.java
(kafka) branch trunk updated: KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests (#15302)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e93242283d1 KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests (#15302) e93242283d1 is described below commit e93242283d1b7c4855e97009adc130373e7bbec7 Author: Chris Egerton AuthorDate: Wed Jul 3 16:25:23 2024 +0200 KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests (#15302) Reviewers: Yash Mayya , Greg Harris --- .../runtime/rest/entities/ConnectorOffset.java | 8 +++ .../runtime/rest/entities/ConnectorOffsets.java| 5 ++ .../integration/OffsetsApiIntegrationTest.java | 62 +- 3 files changed, 63 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java index 39b9815fdc6..2813a65c53c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java @@ -76,4 +76,12 @@ public class ConnectorOffset { public int hashCode() { return Objects.hash(partition, offset); } + +@Override +public String toString() { +return "{" + +"partition=" + partition + +", offset=" + offset + +'}'; +} } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java index e5869580b85..d37138a82ce 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsets.java @@ -88,4 +88,9 @@ public class ConnectorOffsets { public int hashCode() { return Objects.hashCode(offsets); } + +@Override +public String toString() { +return Objects.toString(offsets); +} } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index e0cde90e99d..4571e584502 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -37,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -77,6 +79,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ @Tag("integration") public class OffsetsApiIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(OffsetsApiIntegrationTest.class); + private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; @@ -457,6 +462,11 @@ public class OffsetsApiIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); +// Make sure the tasks' consumers have had a chance to actually form a group +// (otherwise, the reset request will succeed because there won't be any active consumers) +verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION, +"Sink connector consumer group offsets should catch up to the topic end offsets"); + connect.stopConnector(connectorName); // Try to delete the offsets for the single topic partition @@ -802,6 +812,8 @@ public class OffsetsApiIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); +// Make sure the tasks' consumers have had a chance to actually form a group +// (otherwise, the reset request will succeed because there won't be any active consumers) verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION,
(kafka) branch trunk updated: KAFKA-16949: Fixing test_dynamic_logging in system test connect_distributed_test (#15915)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ceec218351f KAFKA-16949: Fixing test_dynamic_logging in system test connect_distributed_test (#15915) ceec218351f is described below commit ceec218351ff741dc338edf8d3810c1f36555e33 Author: vamossagar12 AuthorDate: Tue Jun 25 22:06:04 2024 +0530 KAFKA-16949: Fixing test_dynamic_logging in system test connect_distributed_test (#15915) Reviewers: Chris Egerton --- tests/kafkatest/tests/connect/connect_distributed_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index a244d7e6846..18c97254f1a 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -21,7 +21,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, config_property, quorum, consumer_group -from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource +from kafkatest.services.connect import ConnectDistributedService, ConnectServiceBase, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion @@ -468,7 +468,7 @@ class ConnectDistributedTest(Test): self.setup_services(num_workers=3) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) -self.cc.start() +self.cc.start(mode=ConnectServiceBase.STARTUP_MODE_JOIN) worker = self.cc.nodes[0] initial_loggers = self.cc.get_all_loggers(worker)
(kafka) branch C0urante-patch-1 deleted (was 92bdce97d51)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git was 92bdce97d51 MINOR: Store separate output per test method The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka) branch 3.8 updated: MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 15b62351a13 MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306) 15b62351a13 is described below commit 15b62351a138dd88c3680e37f7b2375faa27f72c Author: Chris Egerton AuthorDate: Thu Jun 13 05:43:33 2024 +0200 MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306) Reviewers: Greg Harris --- .../integration/ExactlyOnceSourceIntegrationTest.java | 17 - 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 6d4b648201a..84ee814ae40 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -99,6 +99,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest { workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); startConnect(); -EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps); + +int numConnectorTargetedBrokers = 1; +EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps); try (Closeable clusterShutdown = connectorTargetedCluster::stop) { connectorTargetedCluster.start(); +// Wait for the connector-targeted Kafka cluster to get on its feet +waitForCondition( +() -> connectorTargetedCluster.runningBrokers().size() == numConnectorTargetedBrokers, +ConnectAssertions.WORKER_SETUP_DURATION_MS, +"Separate Kafka cluster did not start in time" +); + String topic = "test-topic"; connectorTargetedCluster.createTopic(topic, 3); @@ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest { // start a source connector connect.configureConnector(CONNECTOR_NAME, props); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, +numTasks, +"connector and tasks did not start in time" +); log.info("Waiting for records to be provided to worker by task"); // wait for the connector tasks to produce enough records
(kafka) branch trunk updated (0a203a96220 -> 9ddd58bd6c0)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 0a203a96220 KAFKA-16938 non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig. (#16302) add 9ddd58bd6c0 MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306) No new revisions were added by this update. Summary of changes: .../integration/ExactlyOnceSourceIntegrationTest.java | 17 - 1 file changed, 16 insertions(+), 1 deletion(-)
(kafka) branch 3.8 updated: KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 1b1821dbffc KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288) 1b1821dbffc is described below commit 1b1821dbffcb1f26cf4b409a17f28900109b5a08 Author: Chris Egerton AuthorDate: Thu Jun 13 02:18:23 2024 +0200 KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288) Reviewers: Greg Harris --- .../MirrorConnectorsIntegrationBaseTest.java | 4 -- .../connect/integration/BlockingConnectorTest.java | 5 -- .../integration/ConnectWorkerIntegrationTest.java | 66 +- .../ConnectorClientPolicyIntegrationTest.java | 2 - .../ConnectorRestartApiIntegrationTest.java| 2 - .../ConnectorTopicsIntegrationTest.java| 6 -- .../integration/ErrorHandlingIntegrationTest.java | 2 - .../ExactlyOnceSourceIntegrationTest.java | 3 - .../integration/InternalTopicsIntegrationTest.java | 14 ++--- .../integration/OffsetsApiIntegrationTest.java | 9 --- .../RebalanceSourceConnectorsIntegrationTest.java | 18 -- .../integration/RestExtensionIntegrationTest.java | 3 - .../SessionedProtocolIntegrationTest.java | 13 ++--- .../integration/SinkConnectorsIntegrationTest.java | 2 - .../SourceConnectorsIntegrationTest.java | 8 +-- .../integration/TransformationIntegrationTest.java | 12 .../connect/util/clusters/EmbeddedConnect.java | 24 +++- 17 files changed, 34 insertions(+), 159 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 4fbd282d11c..5398f21c626 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -219,16 +219,12 @@ public class MirrorConnectorsIntegrationBaseTest { .build(); primary.start(); -primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did not start in time."); waitForTopicCreated(primary, "mm2-status.backup.internal"); waitForTopicCreated(primary, "mm2-offsets.backup.internal"); waitForTopicCreated(primary, "mm2-configs.backup.internal"); backup.start(); -backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time."); primaryProducer = initializeProducer(primary); backupProducer = initializeProducer(backup); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 3eefee64c0d..532ab1baaf0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -134,11 +134,6 @@ public class BlockingConnectorTest { // start the clusters connect.start(); - -connect.assertions().assertAtLeastNumWorkersAreUp( -NUM_WORKERS, -"Initial group of workers did not start in time" -); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index c540016f104..84ff88013fe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -172,9 +172,6 @@ public class ConnectWorkerIntegrationTest { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); -connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -218,9 +215,6 @@ public class ConnectWorkerIntegrationTest { props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
(kafka) branch trunk updated: KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e76e1da5ea5 KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288) e76e1da5ea5 is described below commit e76e1da5ea5cca0df90880481ee092b1766831c4 Author: Chris Egerton AuthorDate: Thu Jun 13 02:18:23 2024 +0200 KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (#16288) Reviewers: Greg Harris --- .../MirrorConnectorsIntegrationBaseTest.java | 4 -- .../connect/integration/BlockingConnectorTest.java | 5 -- .../integration/ConnectWorkerIntegrationTest.java | 66 +- .../ConnectorClientPolicyIntegrationTest.java | 2 - .../ConnectorRestartApiIntegrationTest.java| 2 - .../ConnectorTopicsIntegrationTest.java| 6 -- .../integration/ErrorHandlingIntegrationTest.java | 2 - .../ExactlyOnceSourceIntegrationTest.java | 3 - .../integration/InternalTopicsIntegrationTest.java | 14 ++--- .../integration/OffsetsApiIntegrationTest.java | 9 --- .../RebalanceSourceConnectorsIntegrationTest.java | 18 -- .../integration/RestExtensionIntegrationTest.java | 3 - .../SessionedProtocolIntegrationTest.java | 13 ++--- .../integration/SinkConnectorsIntegrationTest.java | 2 - .../SourceConnectorsIntegrationTest.java | 8 +-- .../integration/TransformationIntegrationTest.java | 12 .../connect/util/clusters/EmbeddedConnect.java | 24 +++- 17 files changed, 34 insertions(+), 159 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 3f169b46920..d49435ab7ac 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -219,16 +219,12 @@ public class MirrorConnectorsIntegrationBaseTest { .build(); primary.start(); -primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did not start in time."); waitForTopicCreated(primary, "mm2-status.backup.internal"); waitForTopicCreated(primary, "mm2-offsets.backup.internal"); waitForTopicCreated(primary, "mm2-configs.backup.internal"); backup.start(); -backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time."); primaryProducer = initializeProducer(primary); backupProducer = initializeProducer(backup); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 3eefee64c0d..532ab1baaf0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -134,11 +134,6 @@ public class BlockingConnectorTest { // start the clusters connect.start(); - -connect.assertions().assertAtLeastNumWorkersAreUp( -NUM_WORKERS, -"Initial group of workers did not start in time" -); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index c540016f104..84ff88013fe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -172,9 +172,6 @@ public class ConnectWorkerIntegrationTest { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); -connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, -"Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -218,9 +215,6 @@ public class ConnectWorkerIntegrationTest { props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
(kafka) branch 3.8 updated: MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 520fbb4116b MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) 520fbb4116b is described below commit 520fbb4116b92bbb362e2a67f0b20ffc644f2903 Author: Chris Egerton AuthorDate: Tue Jun 11 23:15:07 2024 +0200 MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) Reviewers: Greg Harris --- .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 + 1 file changed, 9 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index dc507b68df7..2da52cf9abd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -143,6 +143,15 @@ public class OffsetsApiIntegrationTest { result.start(); +try { +result.assertions().assertExactlyNumWorkersAreUp( +NUM_WORKERS, +"Workers did not complete startup in time" +); +} catch (InterruptedException e) { +throw new RuntimeException("Interrupted while awaiting cluster startup", e); +} + return result; }); }
(kafka) branch trunk updated (8b6013f851f -> 2fa2c72581d)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8b6013f851f KAFKA-15045: (KIP-924 pt. 21) UUID to ProcessId migration (#16269) add 2fa2c72581d MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) No new revisions were added by this update. Summary of changes: .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 + 1 file changed, 9 insertions(+)
(kafka) branch 3.8 updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 5c13a6cf2f2 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) 5c13a6cf2f2 is described below commit 5c13a6cf2f2bb37a6f7b80595483a56cbd85a77f Author: Chris Egerton AuthorDate: Tue Jun 11 21:13:35 2024 +0200 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) Reviewers: Greg Harris --- .../kafka/connect/integration/ConnectWorkerIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 83fce9231f7..c540016f104 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest { // since failure to reconfigure the tasks (which may occur if the bug this test was written // to help catch resurfaces) will not cause existing tasks to fail or stop running StartAndStopLatch restarts = connectorHandle.expectedStarts(1); -connectorHandle.expectedCommits(NUM_TASKS * 2); final String secondConnectorTopic = "connector-topic-2"; connect.kafka().createTopic(secondConnectorTopic, 1); @@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest { "Connector tasks were not restarted in time", restarts.await(10, TimeUnit.SECONDS) ); + +// Wait for at least one task to commit offsets after being restarted +connectorHandle.expectedCommits(1); connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
(kafka) branch trunk updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ac2a642ba99 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) ac2a642ba99 is described below commit ac2a642ba99cde8a398510ce5cf503462863b489 Author: Chris Egerton AuthorDate: Tue Jun 11 21:13:35 2024 +0200 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) Reviewers: Greg Harris --- .../kafka/connect/integration/ConnectWorkerIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 83fce9231f7..c540016f104 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest { // since failure to reconfigure the tasks (which may occur if the bug this test was written // to help catch resurfaces) will not cause existing tasks to fail or stop running StartAndStopLatch restarts = connectorHandle.expectedStarts(1); -connectorHandle.expectedCommits(NUM_TASKS * 2); final String secondConnectorTopic = "connector-topic-2"; connect.kafka().createTopic(secondConnectorTopic, 1); @@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest { "Connector tasks were not restarted in time", restarts.await(10, TimeUnit.SECONDS) ); + +// Wait for at least one task to commit offsets after being restarted +connectorHandle.expectedCommits(1); connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
(kafka) branch 3.7 updated: KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new b38d2eb0ea5 KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053) b38d2eb0ea5 is described below commit b38d2eb0ea52f8f122b8fc366a5b946804d208ec Author: Chris Egerton AuthorDate: Mon Jun 10 23:02:08 2024 +0200 KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053) Reviewers: Greg Harris --- checkstyle/suppressions.xml| 2 +- .../kafka/connect/runtime/AbstractHerder.java | 12 +- .../connect/storage/AppliedConnectorConfig.java| 66 ++ .../kafka/connect/storage/ClusterConfigState.java | 20 +++ .../connect/storage/KafkaConfigBackingStore.java | 12 +- .../connect/storage/MemoryConfigBackingStore.java | 18 +++ .../integration/ConnectWorkerIntegrationTest.java | 139 - .../kafka/connect/runtime/AbstractHerderTest.java | 41 ++ .../apache/kafka/connect/runtime/WorkerTest.java | 53 +++- .../kafka/connect/runtime/WorkerTestUtils.java | 10 +- .../runtime/distributed/DistributedHerderTest.java | 23 .../IncrementalCooperativeAssignorTest.java| 7 ++ .../runtime/distributed/WorkerCoordinatorTest.java | 10 ++ .../runtime/standalone/StandaloneHerderTest.java | 33 +++-- .../org.apache.kafka.connect.sink.SinkConnector| 3 +- 15 files changed, 433 insertions(+), 16 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7b9fcb6409c..9b1dd06cd3d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -166,7 +166,7 @@ + files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask|Worker)Test.java"/> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index ee7e38f3aaa..e33d6e04e14 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -79,6 +79,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -851,7 +852,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (rawTaskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size()); result = true; -} else { +} +if (!result) { for (int index = 0; index < currentNumTasks; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) { @@ -860,6 +862,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con } } } +if (!result) { +Map appliedConnectorConfig = configState.appliedConnectorConfig(connName); +Map currentConnectorConfig = configState.connectorConfig(connName); +if (!Objects.equals(appliedConnectorConfig, currentConnectorConfig)) { +log.debug("Forcing task restart for connector {} as its configuration appears to be updated", connName); +result = true; +} +} if (result) { log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", connName); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java new file mode 100644 index 000..22f20e4b4a1 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is
(kafka) branch trunk updated (1426e8e9206 -> eec8fd6a98c)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 1426e8e9206 KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. (#16043) add eec8fd6a98c KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/AbstractHerder.java | 11 +- .../connect/storage/AppliedConnectorConfig.java| 66 ++ .../kafka/connect/storage/ClusterConfigState.java | 20 +++ .../connect/storage/KafkaConfigBackingStore.java | 12 +- .../connect/storage/MemoryConfigBackingStore.java | 18 +++ .../integration/ConnectWorkerIntegrationTest.java | 135 + .../kafka/connect/runtime/AbstractHerderTest.java | 41 +++ .../apache/kafka/connect/runtime/WorkerTest.java | 5 + .../kafka/connect/runtime/WorkerTestUtils.java | 10 +- .../runtime/distributed/DistributedHerderTest.java | 26 .../IncrementalCooperativeAssignorTest.java| 7 ++ .../runtime/distributed/WorkerCoordinatorTest.java | 10 ++ .../runtime/standalone/StandaloneHerderTest.java | 26 +++- .../org.apache.kafka.connect.sink.SinkConnector| 3 +- 14 files changed, 383 insertions(+), 7 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java
(kafka) branch 3.8 updated: MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new ec278a68649 MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197) ec278a68649 is described below commit ec278a6864926cdfd2f4117e53048734f3f392e7 Author: Chris Egerton AuthorDate: Tue Jun 4 21:04:34 2024 +0200 MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197) Reviewers: Greg Harris , Chia-Ping Tsai --- clients/src/main/java/org/apache/kafka/clients/admin/Admin.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index d936ec80ffe..291250aae91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -901,7 +901,7 @@ public interface Admin extends AutoCloseable { * List the consumer groups available in the cluster. * * @param options The options to use when listing the consumer groups. - * @return The ListGroupsResult. + * @return The ListConsumerGroupsResult. */ ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); @@ -911,7 +911,7 @@ public interface Admin extends AutoCloseable { * This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options. * See the overload for more details. * - * @return The ListGroupsResult. + * @return The ListConsumerGroupsResult. */ default ListConsumerGroupsResult listConsumerGroups() { return listConsumerGroups(new ListConsumerGroupsOptions()); @@ -921,7 +921,7 @@ public interface Admin extends AutoCloseable { * List the consumer group offsets available in the cluster. * * @param options The options to use when listing the consumer group offsets. - * @return The ListGroupOffsetsResult + * @return The ListConsumerGroupOffsetsResult */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { @SuppressWarnings("deprecation") @@ -939,7 +939,7 @@ public interface Admin extends AutoCloseable { * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} * to list offsets of all partitions of one group with default options. * - * @return The ListGroupOffsetsResult. + * @return The ListConsumerGroupOffsetsResult. */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
(kafka) branch 3.7 updated: MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new c5049febaea MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197) c5049febaea is described below commit c5049febaea4d6ae74e96aa9b63b94e2c9ef5e47 Author: Chris Egerton AuthorDate: Tue Jun 4 21:04:34 2024 +0200 MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197) Reviewers: Greg Harris , Chia-Ping Tsai --- clients/src/main/java/org/apache/kafka/clients/admin/Admin.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index ff7f4e661d6..03151fde675 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -901,7 +901,7 @@ public interface Admin extends AutoCloseable { * List the consumer groups available in the cluster. * * @param options The options to use when listing the consumer groups. - * @return The ListGroupsResult. + * @return The ListConsumerGroupsResult. */ ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); @@ -911,7 +911,7 @@ public interface Admin extends AutoCloseable { * This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options. * See the overload for more details. * - * @return The ListGroupsResult. + * @return The ListConsumerGroupsResult. */ default ListConsumerGroupsResult listConsumerGroups() { return listConsumerGroups(new ListConsumerGroupsOptions()); @@ -921,7 +921,7 @@ public interface Admin extends AutoCloseable { * List the consumer group offsets available in the cluster. * * @param options The options to use when listing the consumer group offsets. - * @return The ListGroupOffsetsResult + * @return The ListConsumerGroupOffsetsResult */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { @SuppressWarnings("deprecation") @@ -939,7 +939,7 @@ public interface Admin extends AutoCloseable { * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} * to list offsets of all partitions of one group with default options. * - * @return The ListGroupOffsetsResult. + * @return The ListConsumerGroupOffsetsResult. */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
(kafka) branch trunk updated (5ce4b91dfa6 -> 2b4779840cb)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 5ce4b91dfa6 KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling (#16147) add 2b4779840cb MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (#16197) No new revisions were added by this update. Summary of changes: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java | 8 1 file changed, 4 insertions(+), 4 deletions(-)
(kafka) branch 3.7 updated: KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 579c1202a75 KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) 579c1202a75 is described below commit 579c1202a75ee4d517ca63f6b7ccbce68dd8c566 Author: Chris Egerton AuthorDate: Tue Jun 4 15:36:24 2024 +0200 KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) Reviewers: Mickael Maison --- .../kafka/connect/runtime/AbstractHerder.java | 10 +- .../runtime/distributed/DistributedHerder.java | 4 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../connect/storage/KafkaConfigBackingStore.java | 29 ++- .../integration/ConnectWorkerIntegrationTest.java | 199 + .../kafka/connect/runtime/AbstractHerderTest.java | 27 +++ .../util/clusters/EmbeddedKafkaCluster.java| 16 +- 7 files changed, 273 insertions(+), 16 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index e9a3a094470..ee7e38f3aaa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -117,7 +117,7 @@ import java.util.stream.Collectors; */ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { -private final Logger log = LoggerFactory.getLogger(AbstractHerder.class); +private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class); private final String workerId; protected final Worker worker; @@ -845,16 +845,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> rawTaskProps) { int currentNumTasks = configState.taskCount(connName); boolean result = false; -if (taskProps.size() != currentNumTasks) { -log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); +if (rawTaskProps.size() != currentNumTasks) { +log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size()); result = true; } else { for (int index = 0; index < currentNumTasks; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); -if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { +if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index d9beb40792e..60d5cabe8d3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -2188,11 +2188,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void publishConnectorTaskConfigs(String connName, List> taskProps, Callback cb) { -if (!taskConfigsChanged(configState, connName, taskProps)) { +List> rawTaskProps = reverseTransform(connName, configState, taskProps); +if (!taskConfigsChanged(configState, connName, rawTaskProps)) { return; } -List> rawTaskProps = reverseTransform(connName, configState, taskProps); if (isLeader()) { writeTaskConfigs(connName, rawTaskProps); cb.onCompletion(null, null); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 40e19da19c7..d51a3e82495 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/St
(kafka) branch 3.8 updated: KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 7404fdffa67 KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) 7404fdffa67 is described below commit 7404fdffa671ed55188444fc96319b4dc301da74 Author: Chris Egerton AuthorDate: Tue Jun 4 15:36:24 2024 +0200 KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) Reviewers: Mickael Maison --- .../kafka/connect/runtime/AbstractHerder.java | 10 +- .../runtime/distributed/DistributedHerder.java | 4 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../connect/storage/KafkaConfigBackingStore.java | 29 ++- .../integration/ConnectWorkerIntegrationTest.java | 199 + .../kafka/connect/runtime/AbstractHerderTest.java | 27 +++ .../KafkaConfigBackingStoreMockitoTest.java| 53 +- .../util/clusters/EmbeddedKafkaCluster.java| 16 +- 8 files changed, 325 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 2a27103079a..c6aeea80a26 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -124,7 +124,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C */ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { -private final Logger log = LoggerFactory.getLogger(AbstractHerder.class); +private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class); private final String workerId; protected final Worker worker; @@ -1039,16 +1039,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> rawTaskProps) { int currentNumTasks = configState.taskCount(connName); boolean result = false; -if (taskProps.size() != currentNumTasks) { -log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); +if (rawTaskProps.size() != currentNumTasks) { +log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size()); result = true; } else { for (int index = 0; index < currentNumTasks; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); -if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { +if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index cdffbb87871..ab46ee536ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -2229,11 +2229,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void publishConnectorTaskConfigs(String connName, List> taskProps, Callback cb) { -if (!taskConfigsChanged(configState, connName, taskProps)) { +List> rawTaskProps = reverseTransform(connName, configState, taskProps); +if (!taskConfigsChanged(configState, connName, rawTaskProps)) { return; } -List> rawTaskProps = reverseTransform(connName, configState, taskProps); if (isLeader()) { writeTaskConfigs(connName, rawTaskProps); cb.onCompletion(null, null); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index e773eeefd5c..2768d910d4b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime
(kafka) branch trunk updated (a08db65670d -> 0409003c439)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from a08db65670d KAFKA-16888 Fix failed StorageToolTest.testFormatSucceedsIfAllDirectoriesAreAvailable and StorageToolTest.testFormatEmptyDirectory (#16186) add 0409003c439 KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors, and compare raw task configs before publishing them (#16122) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/AbstractHerder.java | 10 +- .../runtime/distributed/DistributedHerder.java | 4 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../connect/storage/KafkaConfigBackingStore.java | 29 ++- .../integration/ConnectWorkerIntegrationTest.java | 199 + .../kafka/connect/runtime/AbstractHerderTest.java | 27 +++ .../KafkaConfigBackingStoreMockitoTest.java| 53 +- .../util/clusters/EmbeddedKafkaCluster.java| 16 +- 8 files changed, 325 insertions(+), 17 deletions(-)
(kafka) branch trunk updated: MINOR: Use project path instead of name for spotlessApplyModules in Gradle script (#16177)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c6c39c0062b MINOR: Use project path instead of name for spotlessApplyModules in Gradle script (#16177) c6c39c0062b is described below commit c6c39c0062b60d4b51a09b72d7d9c0e8618a00f1 Author: Chris Egerton AuthorDate: Mon Jun 3 21:19:18 2024 +0200 MINOR: Use project path instead of name for spotlessApplyModules in Gradle script (#16177) Reviewers: Chia-Ping Tsai --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index a2a6531d29a..cd191877502 100644 --- a/build.gradle +++ b/build.gradle @@ -799,7 +799,7 @@ subprojects { skipConfigurations = [ "zinc" ] } - if (project.name in spotlessApplyModules) { + if (project.path in spotlessApplyModules) { apply plugin: 'com.diffplug.spotless' spotless { java {
(kafka) branch 3.7 updated: KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 1565fb6dd2f KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175) 1565fb6dd2f is described below commit 1565fb6dd2f984250c73233cd024112b011fcbde Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Tue Jun 4 02:34:58 2024 +0900 KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175) Reviewers: Chris Egerton --- .../kafka/connect/runtime/rest/entities/CreateConnectorRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java index da8e235e424..9d338936dbb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java @@ -47,7 +47,7 @@ public class CreateConnectorRequest { return config; } -@JsonProperty +@JsonProperty("initial_state") public InitialState initialState() { return initialState; }
(kafka) branch 3.8 updated: KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 495ec16fb23 KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175) 495ec16fb23 is described below commit 495ec16fb23310ccf863beecb32cbf0687114af2 Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Tue Jun 4 02:34:58 2024 +0900 KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175) Reviewers: Chris Egerton --- .../kafka/connect/runtime/rest/entities/CreateConnectorRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java index da8e235e424..9d338936dbb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java @@ -47,7 +47,7 @@ public class CreateConnectorRequest { return config; } -@JsonProperty +@JsonProperty("initial_state") public InitialState initialState() { return initialState; }
(kafka) branch trunk updated (04e6ef47508 -> 5cbc1d616ac)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 04e6ef47508 KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry (#16169) add 5cbc1d616ac KAFKA-16881: InitialState type leaks into the Connect REST API OpenAPI spec (#16175) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/rest/entities/CreateConnectorRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(kafka) branch trunk updated: Minor: Add URL to log for Connect RestClient (#16166)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 966f2eb3efd Minor: Add URL to log for Connect RestClient (#16166) 966f2eb3efd is described below commit 966f2eb3efd929a4c270c502514aa74ea1414677 Author: Farbod Ahmadian AuthorDate: Sat Jun 1 22:25:16 2024 +0200 Minor: Add URL to log for Connect RestClient (#16166) Reviewers: Chris Egerton --- .../main/java/org/apache/kafka/connect/runtime/rest/RestClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java index 9a47a0e7530..37673ee0557 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java @@ -190,15 +190,15 @@ public class RestClient { "Unexpected status code when handling forwarded request: " + responseCode); } } catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { -log.error("IO error forwarding REST request: ", e); +log.error("IO error forwarding REST request to {} :", url, e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); } catch (ConnectRestException e) { // catching any explicitly thrown ConnectRestException-s to preserve its status code // and to avoid getting it overridden by the more generic catch (Throwable) clause down below -log.error("Error forwarding REST request", e); +log.error("Error forwarding REST request to {} :", url, e); throw e; } catch (Throwable t) { -log.error("Error forwarding REST request", t); +log.error("Error forwarding REST request to {} :", url, t); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t); } }
(kafka) branch trunk updated: KAFKA-16844: Add ByteBuffer support for Connect ByteArrayConverter (#16101)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 32b2b73f673 KAFKA-16844: Add ByteBuffer support for Connect ByteArrayConverter (#16101) 32b2b73f673 is described below commit 32b2b73f673ecd41d17c03e99db3746c517990c4 Author: Fan Yang AuthorDate: Thu May 30 23:26:25 2024 +0800 KAFKA-16844: Add ByteBuffer support for Connect ByteArrayConverter (#16101) Reviewers: Chris Egerton --- .../connect/converters/ByteArrayConverter.java | 16 +-- .../connect/converters/ByteArrayConverterTest.java | 24 ++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java index 6d17873d072..ec934ad56cc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.HeaderConverter; +import java.nio.ByteBuffer; import java.util.Map; /** @@ -59,10 +60,10 @@ public class ByteArrayConverter implements Converter, HeaderConverter, Versioned if (schema != null && schema.type() != Schema.Type.BYTES) throw new DataException("Invalid schema type for ByteArrayConverter: " + schema.type().toString()); -if (value != null && !(value instanceof byte[])) +if (value != null && !(value instanceof byte[]) && !(value instanceof ByteBuffer)) throw new DataException("ByteArrayConverter is not compatible with objects of type " + value.getClass()); -return (byte[]) value; +return value instanceof ByteBuffer ? getBytesFromByteBuffer((ByteBuffer) value) : (byte[]) value; } @Override @@ -84,4 +85,15 @@ public class ByteArrayConverter implements Converter, HeaderConverter, Versioned public void close() { // do nothing } + +private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { +if (byteBuffer == null) { +return null; +} + +byteBuffer.rewind(); +byte[] bytes = new byte[byteBuffer.remaining()]; +byteBuffer.get(bytes); +return bytes; +} } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java index e971a3151c7..748fd70e63f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.DataException; import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -76,6 +77,29 @@ public class ByteArrayConverterTest { assertNull(converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, null)); } +@Test +public void testFromConnectByteBufferValue() { +ByteBuffer buffer = ByteBuffer.wrap(SAMPLE_BYTES); +assertArrayEquals( +SAMPLE_BYTES, +converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer)); + +buffer.rewind(); +buffer.get(); // Move the position +assertArrayEquals( +SAMPLE_BYTES, +converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer)); + +buffer = null; +assertNull(converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer)); + +byte[] emptyBytes = new byte[0]; +buffer = ByteBuffer.wrap(emptyBytes); +assertArrayEquals( +emptyBytes, +converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer)); +} + @Test public void testToConnect() { SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_BYTES);
(kafka) branch trunk updated: KAFKA-16684: Fix flaky DedicatedMirrorIntegrationTest (#15906)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0cfc2983a58 KAFKA-16684: Fix flaky DedicatedMirrorIntegrationTest (#15906) 0cfc2983a58 is described below commit 0cfc2983a58e8e8dc15eccf7a413804cd9062f26 Author: Johnny Hsu <44309740+johnnych...@users.noreply.github.com> AuthorDate: Sat May 11 02:42:26 2024 +0800 KAFKA-16684: Fix flaky DedicatedMirrorIntegrationTest (#15906) Reviewers: Chris Egerton --- .../mirror/integration/DedicatedMirrorIntegrationTest.java | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index 37fe834e504..07bbafd7516 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector; import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.source.SourceConnector; @@ -259,10 +260,9 @@ public class DedicatedMirrorIntegrationTest { // wait for heartbeat connector to start running awaitConnectorTasksStart(mirrorMakers.get("node 0"), MirrorHeartbeatConnector.class, sourceAndTarget); -// Create one topic per Kafka cluster per MirrorMaker node -final int topicsPerCluster = numNodes; final int messagesPerTopic = 10; -for (int i = 0; i < topicsPerCluster; i++) { +// Create one topic per Kafka cluster per MirrorMaker node +for (int i = 0; i < numNodes; i++) { String topic = testTopicPrefix + i; // Create the topic on cluster A @@ -353,6 +353,12 @@ public class DedicatedMirrorIntegrationTest { .map(TaskInfo::config) .allMatch(predicate); } catch (Exception ex) { +if (ex instanceof RebalanceNeededException) { +// RebalanceNeededException should be retry-able. +// This happens when a worker has read a new config from the config topic, but hasn't completed the +// subsequent rebalance yet +throw ex; +} log.error("Something unexpected occurred. Unable to get configuration of connector {} for mirror maker with source->target={}", connName, sourceAndTarget, ex); throw new NoRetryException(ex); }
(kafka) branch trunk updated: KAFKA-16445: Add PATCH method for connector config (#6934)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5a9ccb6b773 KAFKA-16445: Add PATCH method for connector config (#6934) 5a9ccb6b773 is described below commit 5a9ccb6b7733a910b2f3a6a9433fd09cfef394c1 Author: Ivan Yurchenko AuthorDate: Thu May 9 22:59:09 2024 +0300 KAFKA-16445: Add PATCH method for connector config (#6934) Reviewers: Chris Egerton --- .../kafka/connect/runtime/AbstractHerder.java | 1 - .../org/apache/kafka/connect/runtime/Herder.java | 8 ++ .../runtime/distributed/DistributedHerder.java | 112 +++--- .../runtime/rest/resources/ConnectorsResource.java | 13 +++ .../runtime/standalone/StandaloneHerder.java | 26 + .../apache/kafka/connect/util/ConnectUtils.java| 25 .../integration/ConnectWorkerIntegrationTest.java | 37 ++ .../runtime/distributed/DistributedHerderTest.java | 127 + .../rest/resources/ConnectorsResourceTest.java | 42 +++ .../runtime/standalone/StandaloneHerderTest.java | 73 .../kafka/connect/util/ConnectUtilsTest.java | 20 .../connect/util/clusters/EmbeddedConnect.java | 37 ++ docs/connect.html | 1 + 13 files changed, 479 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 174b032ea24..2a27103079a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1185,5 +1185,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return loggers.setLevel(namespace, level); } - } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index a8a6e7858d8..f33fa6fac46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -121,6 +121,14 @@ public interface Herder { void putConnectorConfig(String connName, Map config, TargetState targetState, boolean allowReplace, Callback> callback); +/** + * Patch the configuration for a connector. + * @param connName name of the connector + * @param configPatch the connector's configuration patch. + * @param callback callback to invoke when the configuration has been written + */ +void patchConnectorConfig(String connName, Map configPatch, Callback> callback); + /** * Delete a connector and its configuration. * @param connName name of the connector diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 22a9640d4ef..f3f2ae7e934 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1096,54 +1096,82 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.trace("Submitting connector config write request {}", connName); addRequest( () -> { -validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> { -if (error != null) { -callback.onCompletion(error, null); -return; -} - -// Complete the connector config write via another herder request in order to -// perform the write to the backing store (or forward to the leader) during -// the "external request" portion of the tick loop -addRequest( -() -> { -if (maybeAddConfigErrors(configInfos, callback)) { -return null; -} - -log.trace("Handling connector config request {}", connName); -if (!isLeader()) { -callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); -return null; -} -
(kafka) branch 3.7 updated: KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 88dd2b16f90 KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) 88dd2b16f90 is described below commit 88dd2b16f90da00a527eb5f1ef739a99a144738b Author: Chris Egerton AuthorDate: Wed Jan 10 09:03:23 2024 -0500 KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) Reviewers: Sagar Rao , Greg Harris --- checkstyle/import-control.xml | 5 + .../connect/mirror/rest/MirrorRestServer.java | 23 - .../rest/resources/InternalMirrorResource.java | 11 +- .../org/apache/kafka/connect/runtime/Worker.java | 10 +- .../connect/runtime/rest/ConnectRestServer.java| 31 -- .../connect/runtime/rest/HerderRequestHandler.java | 15 +-- ...onnectResource.java => RestRequestTimeout.java} | 25 ++--- .../kafka/connect/runtime/rest/RestServer.java | 111 ++--- .../runtime/rest/resources/ConnectResource.java| 40 .../rest/resources/ConnectorPluginsResource.java | 19 ++-- .../runtime/rest/resources/ConnectorsResource.java | 21 ++-- .../rest/resources/InternalClusterResource.java| 12 +-- .../rest/resources/InternalConnectResource.java| 7 +- .../runtime/rest/resources/LoggingResource.java| 10 +- .../runtime/rest/resources/RootResource.java | 10 +- .../connect/integration/BlockingConnectorTest.java | 4 +- .../integration/ConnectWorkerIntegrationTest.java | 2 +- .../apache/kafka/connect/runtime/WorkerTest.java | 8 +- .../runtime/rest/ConnectRestServerTest.java| 35 --- .../resources/ConnectorPluginsResourceTest.java| 5 +- .../rest/resources/ConnectorsResourceTest.java | 16 +-- .../resources/InternalConnectResourceTest.java | 3 +- 22 files changed, 219 insertions(+), 204 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8842ec70c29..5fc845d48e9 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -514,6 +514,9 @@ + + + @@ -528,6 +531,8 @@ + + diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index 7f1fe2841a3..a5abeff40ce 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; +import org.glassfish.hk2.api.TypeLiteral; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; import java.util.Collection; @@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer { } @Override -protected Collection regularResources() { +protected Collection> regularResources() { return Arrays.asList( -new InternalMirrorResource(herders, restClient) +InternalMirrorResource.class ); } @Override -protected Collection adminResources() { +protected Collection> adminResources() { return Collections.emptyList(); } +@Override +protected void configureRegularResources(ResourceConfig resourceConfig) { +resourceConfig.register(new Binder()); +} + +private class Binder extends AbstractBinder { +@Override +protected void configure() { +bind(herders).to(new TypeLiteral>() { }); +bind(restClient).to(RestClient.class); +} +} + } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 8b5150f56ac..5c46bd9c6c5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources; import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.runtime.Her
(kafka) branch trunk updated: KAFKA-15018: Write connector tombstone offsets to secondary store before primary store (#13801)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 525b9b1d768 KAFKA-15018: Write connector tombstone offsets to secondary store before primary store (#13801) 525b9b1d768 is described below commit 525b9b1d7682ae2a527ceca83fedca44b1cba11a Author: vamossagar12 AuthorDate: Tue May 7 23:59:02 2024 +0530 KAFKA-15018: Write connector tombstone offsets to secondary store before primary store (#13801) Reviewers: Chris Egerton --- .../storage/ConnectorOffsetBackingStore.java | 144 ++- .../storage/ConnectorOffsetBackingStoreTest.java | 456 + 2 files changed, 596 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java index 075d00f3090..7a18fca9793 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java @@ -39,6 +39,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; /** @@ -253,8 +255,23 @@ public class ConnectorOffsetBackingStore implements OffsetBackingStore { * If configured to use a connector-specific offset store, the returned {@link Future} corresponds to a * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store - * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in - * {@link Callback}; they are only logged as a warning to users. + * succeeds. + * + * Normally, errors with this secondary write are not reflected in the returned {@link Future} or the passed-in + * {@link Callback}; they are only logged as a warning to users. The only exception to this rule is when the + * offsets that need to be committed contain tombstone records. + * When the to-be-committed offsets contain tombstones, offset commits take place in three phases: + * + * First, only the tombstone offsets are written to the worker-global store. Failures during this step will + * be reflected in the returned {@link Future} and reported to the passed-in {@link Callback}. + * If and only if the previous write to the worker-global store succeeded, all offsets (both tombstones and + * non-tombstones) are written to the connector-specific store. Failures during this step will also be + * reflected in the returned {@link Future} and reported to the passed-in {@link Callback}. + * Finally, if and only if the previous write to the connector-specific store succeeded, all offsets with + * non-tombstone values are written to the worker-global store. Failures during this step will only be reported + * as warning log messages, and will not be reflected in the returned {@link Future} or reported to the + * passed-in {@link Callback}. + * * * If not configured to use a connector-specific offset store, the returned {@link Future} corresponds to a * write to the worker-global offset store, and the passed-in {@link Callback} is invoked once that write completes. @@ -262,6 +279,10 @@ public class ConnectorOffsetBackingStore implements OffsetBackingStore { * @param values map from key to value * @param callback callback to invoke on completion of the primary write * @return void future for the primary write +* + * @see https://issues.apache.org/jira/browse/KAFKA-15018;>KAFKA-15018 for context on the three-step + * write sequence + * */ @Override public Future set(Map values, Callback callback) { @@ -279,7 +300,38 @@ public class ConnectorOffsetBackingStore implements OffsetBackingStore { throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +Map regularOffsets = new HashMap<>(); +Map tombstoneOffsets = new HashMap<>(); +values.forEach((partition, offset) -> { +if (offset == null) { +tombstoneOffsets.put(partition, null); +} else { +regul
(kafka) branch trunk updated: KAFKA-14226: Introduce FieldPath abstraction and nested path support for ExtractField SMT (#15379)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a4c6cefd10f KAFKA-14226: Introduce FieldPath abstraction and nested path support for ExtractField SMT (#15379) a4c6cefd10f is described below commit a4c6cefd10f3db206c17b69fac5bab71521f28f9 Author: Jorge Esteban Quilcate Otoya AuthorDate: Tue May 7 21:07:18 2024 +0300 KAFKA-14226: Introduce FieldPath abstraction and nested path support for ExtractField SMT (#15379) Reviewers: Chris Egerton --- checkstyle/import-control.xml | 1 + .../kafka/connect/transforms/ExtractField.java | 27 ++- .../transforms/field/FieldSyntaxVersion.java | 90 .../connect/transforms/field/SingleFieldPath.java | 242 + .../kafka/connect/transforms/ExtractFieldTest.java | 71 +- .../transforms/field/FieldPathNotationTest.java| 160 ++ .../transforms/field/FieldSyntaxVersionTest.java | 71 ++ .../transforms/field/SingleFieldPathTest.java | 117 ++ 8 files changed, 770 insertions(+), 9 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index fc378eb7dd2..71cb83dab8f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -658,6 +658,7 @@ + diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java index 7a6fd4ae202..1b3a77799df 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java @@ -23,6 +23,8 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.field.SingleFieldPath; +import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; @@ -40,12 +42,20 @@ public abstract class ExtractField> implements Transf private static final String FIELD_CONFIG = "field"; -public static final ConfigDef CONFIG_DEF = new ConfigDef() -.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); +public static final ConfigDef CONFIG_DEF = FieldSyntaxVersion.appendConfigTo( +new ConfigDef() +.define( +FIELD_CONFIG, +ConfigDef.Type.STRING, +ConfigDef.NO_DEFAULT_VALUE, +ConfigDef.Importance.MEDIUM, +"Field name to extract." +)); private static final String PURPOSE = "field extraction"; -private String fieldName; +private SingleFieldPath fieldPath; +private String originalPath; @Override public String version() { @@ -55,7 +65,8 @@ public abstract class ExtractField> implements Transf @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); -fieldName = config.getString(FIELD_CONFIG); +originalPath = config.getString(FIELD_CONFIG); +fieldPath = new SingleFieldPath(originalPath, FieldSyntaxVersion.fromConfig(config)); } @Override @@ -63,16 +74,16 @@ public abstract class ExtractField> implements Transf final Schema schema = operatingSchema(record); if (schema == null) { final Map value = requireMapOrNull(operatingValue(record), PURPOSE); -return newRecord(record, null, value == null ? null : value.get(fieldName)); +return newRecord(record, null, value == null ? null : fieldPath.valueFrom(value)); } else { final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Field field = schema.field(fieldName); +Field field = fieldPath.fieldFrom(schema); if (field == null) { -throw new IllegalArgumentException("Unknown field: " + fieldName); +throw new IllegalArgumentException("Unknown field: " + originalPath); } -return newRecord(record, field.schema(), value == null ? null : value.get(fieldName)); +return newRecord(record, field.schema(), value == null ? null : fieldPath.valueFrom(value)); } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersion.java b
(kafka) branch trunk updated: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters (#14309)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 05df10449eb KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters (#14309) 05df10449eb is described below commit 05df10449eb9c95fe6d6055b302c84686be8058d Author: Chris Egerton AuthorDate: Tue May 7 17:30:57 2024 +0200 KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters (#14309) Reviewers: Greg Harris --- .../kafka/connect/runtime/AbstractHerder.java | 240 +++-- .../kafka/connect/runtime/ConnectorConfig.java | 6 +- .../ConnectorValidationIntegrationTest.java| 90 +++- .../org.apache.kafka.connect.storage.Converter | 4 +- ...rg.apache.kafka.connect.storage.HeaderConverter | 4 +- 5 files changed, 309 insertions(+), 35 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 574d800a88e..174b032ea24 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -52,6 +52,8 @@ import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.transforms.Transformation; @@ -79,6 +81,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -90,6 +93,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; + /** * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions * must invoke the lifecycle hooks appropriately. @@ -392,6 +399,161 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker. Some errors encountered + * during validation may be {@link ConfigValue#addErrorMessage(String) added} to this object + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if either no custom validation was performed (possibly because no cus
(kafka) branch 3.6 updated: KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 88de570d0e3 KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) 88de570d0e3 is described below commit 88de570d0e3d79c7a5989266ccc12012dcf55bda Author: Chris Egerton AuthorDate: Wed Mar 20 15:54:22 2024 + KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) Reviewers: Yash Mayya --- .../java/org/apache/kafka/connect/storage/OffsetUtils.java | 6 -- .../java/org/apache/kafka/connect/storage/OffsetUtilsTest.java | 10 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java index 1d6632e60cb..6c98f427921 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java @@ -115,8 +115,10 @@ public class OffsetUtils { } if (!(keyList.get(1) instanceof Map)) { -log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + -"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +if (keyList.get(1) != null) { +log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + +"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +} return; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java index 06fb51d3ca6..ba81a700b44 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java @@ -134,6 +134,16 @@ public class OffsetUtilsTest { } } +@Test +public void testProcessPartitionKeyNullPartition() { +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) { +Map>> connectorPartitions = new HashMap<>(); + OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", null)), new byte[0], CONVERTER, connectorPartitions); +assertEquals(Collections.emptyMap(), connectorPartitions); +assertEquals(0, logCaptureAppender.getMessages().size()); +} +} + private byte[] serializePartitionKey(Object key) { return CONVERTER.fromConnectData("", null, key); }
(kafka) branch 3.7 updated: KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 9d013dd586d KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) 9d013dd586d is described below commit 9d013dd586d817b0dfa2b644afbbe4d1cab0c6cf Author: Chris Egerton AuthorDate: Wed Mar 20 15:54:22 2024 + KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) Reviewers: Yash Mayya --- .../java/org/apache/kafka/connect/storage/OffsetUtils.java | 6 -- .../java/org/apache/kafka/connect/storage/OffsetUtilsTest.java | 10 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java index 1d6632e60cb..6c98f427921 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java @@ -115,8 +115,10 @@ public class OffsetUtils { } if (!(keyList.get(1) instanceof Map)) { -log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + -"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +if (keyList.get(1) != null) { +log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + +"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +} return; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java index 06fb51d3ca6..ba81a700b44 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java @@ -134,6 +134,16 @@ public class OffsetUtilsTest { } } +@Test +public void testProcessPartitionKeyNullPartition() { +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) { +Map>> connectorPartitions = new HashMap<>(); + OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", null)), new byte[0], CONVERTER, connectorPartitions); +assertEquals(Collections.emptyMap(), connectorPartitions); +assertEquals(0, logCaptureAppender.getMessages().size()); +} +} + private byte[] serializePartitionKey(Object key) { return CONVERTER.fromConnectData("", null, key); }
(kafka) branch trunk updated: KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f1d741a9c10 KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) f1d741a9c10 is described below commit f1d741a9c1057eb40a57d6ffbc81edb7e529e1f5 Author: Chris Egerton AuthorDate: Wed Mar 20 15:54:22 2024 + KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562) Reviewers: Yash Mayya --- .../java/org/apache/kafka/connect/storage/OffsetUtils.java | 6 -- .../java/org/apache/kafka/connect/storage/OffsetUtilsTest.java | 10 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java index 1d6632e60cb..6c98f427921 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java @@ -115,8 +115,10 @@ public class OffsetUtils { } if (!(keyList.get(1) instanceof Map)) { -log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + -"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +if (keyList.get(1) != null) { +log.warn("Ignoring offset partition key with an unexpected format for the second element in the partition key list. " + +"Expected type: {}, actual type: {}", Map.class.getName(), className(keyList.get(1))); +} return; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java index 06fb51d3ca6..ba81a700b44 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java @@ -134,6 +134,16 @@ public class OffsetUtilsTest { } } +@Test +public void testProcessPartitionKeyNullPartition() { +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) { +Map>> connectorPartitions = new HashMap<>(); + OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", null)), new byte[0], CONVERTER, connectorPartitions); +assertEquals(Collections.emptyMap(), connectorPartitions); +assertEquals(0, logCaptureAppender.getMessages().size()); +} +} + private byte[] serializePartitionKey(Object key) { return CONVERTER.fromConnectData("", null, key); }
(kafka) branch trunk updated: KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4f0a4059083 KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180) 4f0a4059083 is described below commit 4f0a40590833a141c78341ce95ffc782747c5ac8 Author: Chris Egerton AuthorDate: Thu Feb 1 11:33:04 2024 -0500 KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180) Reviewers: Ashwin Pankaj , Greg Harris --- checkstyle/suppressions.xml| 2 +- .../kafka/common/utils/LogCaptureAppender.java | 8 + .../kafka/connect/runtime/ConnectorConfig.java | 19 ++ .../connect/runtime/TooManyTasksException.java | 43 .../org/apache/kafka/connect/runtime/Worker.java | 79 -- .../kafka/connect/runtime/WorkerConnector.java | 31 ++- .../runtime/distributed/DistributedHerder.java | 6 +- .../connect/integration/BlockingConnectorTest.java | 24 +- .../integration/ConnectWorkerIntegrationTest.java | 159 .../integration/MonitorableSourceConnector.java| 20 +- .../integration/OffsetsApiIntegrationTest.java | 4 + .../kafka/connect/runtime/AbstractHerderTest.java | 12 +- .../apache/kafka/connect/runtime/WorkerTest.java | 274 - .../runtime/distributed/DistributedHerderTest.java | 42 .../connect/util/clusters/ConnectAssertions.java | 23 ++ .../connect/util/clusters/EmbeddedConnect.java | 17 ++ 16 files changed, 721 insertions(+), 42 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9b63e009693..925c50dec86 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -151,7 +151,7 @@ files="LoggingResource.java" /> + files="(RestServer|AbstractHerder|DistributedHerder|Worker(Test)?).java"/> diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java index 4f035840bd2..eb592c11863 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java @@ -24,6 +24,7 @@ import org.apache.log4j.spi.LoggingEvent; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseable { private final List events = new LinkedList<>(); @@ -100,6 +101,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl } } +public List getMessages(String level) { +return getEvents().stream() +.filter(e -> level.equals(e.getLevel())) +.map(Event::getMessage) +.collect(Collectors.toList()); +} + public List getMessages() { final LinkedList result = new LinkedList<>(); synchronized (events) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index f33f00e40ef..d5cdc23fa36 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -115,6 +115,16 @@ public class ConnectorConfig extends AbstractConfig { private static final String TASK_MAX_DISPLAY = "Tasks max"; +public static final String TASKS_MAX_ENFORCE_CONFIG = "tasks.max.enforce"; +private static final String TASKS_MAX_ENFORCE_DOC = +"(Deprecated) Whether to enforce that the tasks.max property is respected by the connector. " ++ "By default, connectors that generate too many tasks will fail, and existing sets of tasks that exceed the tasks.max property will also be failed. " ++ "If this property is set to false, then connectors will be allowed to generate more than the maximum number of tasks, and existing sets of " ++ "tasks that exceed the tasks.max property will be allowed to run. " ++ "This property is deprecated and will be removed in an upcoming major release."; +public static final boolean TASKS_MAX_ENFORCE_DEFAULT = true; +private static final String TASKS_MAX_ENFORCE_DISPLAY = "Enforce tasks max"; + public static final String TRANSFORMS_CONFIG = "transforms"; private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to
(kafka) branch trunk updated: KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7bc4afee113 KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249) 7bc4afee113 is described below commit 7bc4afee11353eccc5d31491693d1dc6e0bba6f7 Author: Chris Egerton AuthorDate: Thu Feb 1 09:20:24 2024 -0500 KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249) Reviewers: Greg Harris , Yash Mayya --- .../connect/integration/BlockingConnectorTest.java | 5 - .../integration/ConnectWorkerIntegrationTest.java | 3 --- .../integration/ExactlyOnceSourceIntegrationTest.java | 2 +- .../kafka/connect/util/clusters/EmbeddedConnect.java | 18 -- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 2e415d1e31e..8465a1e31e3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -138,11 +138,6 @@ public class BlockingConnectorTest { NUM_WORKERS, "Initial group of workers did not start in time" ); - -try (Response response = connect.requestGet(connect.endpointForResource("connectors/nonexistent"))) { -// hack: make sure the worker is actually up (has joined the cluster, created and read to the end of internal topics, etc.) -assertEquals(404, response.getStatus()); -} } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index ec4d256c6e6..9dca7425d66 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -238,9 +238,6 @@ public class ConnectWorkerIntegrationTest { connect.kafka().stopOnlyKafka(); -connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, -"Group of workers did not remain the same after broker shutdown"); - // Allow for the workers to discover that the coordinator is unavailable, wait is // heartbeat timeout * 2 + 4sec Thread.sleep(TimeUnit.SECONDS.toMillis(10)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index b793cbf0209..09ea3f5ae4b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -500,7 +500,7 @@ public class ExactlyOnceSourceIntegrationTest { connectorHandle.expectedCommits(MINIMUM_MESSAGES); // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around) -assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus()); +connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time"); // fence out the leader of the cluster Producer zombieLeader = transactionalProducer( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index 147e435adf6..af6c60e847e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -32,7 +32,6 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; -import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.SinkUtils; @@
(kafka) branch revert-15247-C0urante-patch-1 deleted (was a76d1ade22d)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch revert-15247-C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git was a76d1ade22d Revert "MINOR: Fix "No suitable checks publisher found" message during CI build" The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka) branch trunk updated (3e9ef708536 -> 99295421c13)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 3e9ef708536 KAFKA-15853: Move PasswordEncoder to server-common (#15246) add 99295421c13 Revert "MINOR: Fix "No suitable checks publisher found" message during CI build" (#15292) No new revisions were added by this update. Summary of changes: Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(kafka) branch revert-15247-C0urante-patch-1 created (now a76d1ade22d)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch revert-15247-C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git at a76d1ade22d Revert "MINOR: Fix "No suitable checks publisher found" message during CI build" No new revisions were added by this update.
(kafka) 01/01: MINOR: Store separate output per test method
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 92bdce97d517e9cc31db527100cf9cdd89a9f55c Author: Chris Egerton AuthorDate: Wed Jan 24 14:48:16 2024 -0500 MINOR: Store separate output per test method --- build.gradle | 18 ++ 1 file changed, 18 insertions(+) diff --git a/build.gradle b/build.gradle index 2ca19c9a323..d6eb8ba4e4b 100644 --- a/build.gradle +++ b/build.gradle @@ -438,6 +438,12 @@ subprojects { } logTestStdout.rehydrate(delegate, owner, this)() +reports { + junitXml { +outputPerTestCase = true + } +} + exclude testsToExclude if (shouldUseJUnit5) @@ -466,6 +472,12 @@ subprojects { } logTestStdout.rehydrate(delegate, owner, this)() +reports { + junitXml { +outputPerTestCase = true + } +} + exclude testsToExclude if (shouldUseJUnit5) { @@ -509,6 +521,12 @@ subprojects { } logTestStdout.rehydrate(delegate, owner, this)() +reports { + junitXml { +outputPerTestCase = true + } +} + exclude testsToExclude if (shouldUseJUnit5) {
(kafka) branch C0urante-patch-1 created (now 92bdce97d51)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git at 92bdce97d51 MINOR: Store separate output per test method This branch includes the following new commits: new 92bdce97d51 MINOR: Store separate output per test method The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka) branch C0urante-patch-1 deleted (was 0195172e377)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git was 0195172e377 MINOR: Fix "No suitable checks publisher found" message during CI build The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka) branch trunk updated: MINOR: Fix "No suitable checks publisher found" message during CI build (#15247)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a5b2a9f0e1e MINOR: Fix "No suitable checks publisher found" message during CI build (#15247) a5b2a9f0e1e is described below commit a5b2a9f0e1e70b5efdc23d7b668cb572d80c5ab3 Author: Chris Egerton AuthorDate: Wed Jan 24 11:21:53 2024 -0500 MINOR: Fix "No suitable checks publisher found" message during CI build (#15247) This message keeps popping up in our CI builds during the "Archive JUnit-formatted test results" step, and can be misleading since it appears to indicate that something is wrong. --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 50b7f6a298e..0b2d14c0b72 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,7 +33,7 @@ def doTest(env, target = "test") { sh """./gradlew -PscalaVersion=$SCALA_VERSION ${target} \ --profile --continue -PkeepAliveMode="session" -PtestLoggingEvents=started,passed,skipped,failed \ -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=10""" - junit '**/build/test-results/**/TEST-*.xml' + junit skipPublishingChecks: true, testResults: '**/build/test-results/**/TEST-*.xml' } def doStreamsArchetype() {
(kafka) 01/01: MINOR: Fix "No suitable checks publisher found" message during CI build
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 0195172e3779b9cc9d7ad8965aae75ca016dcfb5 Author: Chris Egerton AuthorDate: Tue Jan 23 09:40:58 2024 -0500 MINOR: Fix "No suitable checks publisher found" message during CI build This message keeps popping up in our CI builds during the "Archive JUnit-formatted test results" step, and can be misleading since it appears to indicate that something is wrong. --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 50b7f6a298e..0b2d14c0b72 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,7 +33,7 @@ def doTest(env, target = "test") { sh """./gradlew -PscalaVersion=$SCALA_VERSION ${target} \ --profile --continue -PkeepAliveMode="session" -PtestLoggingEvents=started,passed,skipped,failed \ -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=10""" - junit '**/build/test-results/**/TEST-*.xml' + junit skipPublishingChecks: true, testResults: '**/build/test-results/**/TEST-*.xml' } def doStreamsArchetype() {
(kafka) branch C0urante-patch-1 created (now 0195172e377)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git at 0195172e377 MINOR: Fix "No suitable checks publisher found" message during CI build This branch includes the following new commits: new 0195172e377 MINOR: Fix "No suitable checks publisher found" message during CI build The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka) branch trunk updated: MINOR: Stop leaking threads in BlockingConnectorTest (#12290)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a989329ee37 MINOR: Stop leaking threads in BlockingConnectorTest (#12290) a989329ee37 is described below commit a989329ee37be9c53c8974228860bd1a23e6fd09 Author: Chris Egerton AuthorDate: Thu Jan 18 11:11:32 2024 -0500 MINOR: Stop leaking threads in BlockingConnectorTest (#12290) Reviewers: Kvicii , Viktor Somogyi-Vass , Greg Harris --- .../connect/integration/BlockingConnectorTest.java | 105 - .../integration/OffsetsApiIntegrationTest.java | 4 + 2 files changed, 88 insertions(+), 21 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 6655e5a01ca..2e415d1e31e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; @@ -49,10 +51,12 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,6 +68,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -144,7 +149,8 @@ public class BlockingConnectorTest { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); } @Test @@ -382,14 +388,19 @@ public class BlockingConnectorTest { connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } -private static class Block { -private static CountDownLatch blockLatch; +public static class Block { +// All latches that blocking connectors/tasks are or will be waiting on during a test case +private static final Set BLOCK_LATCHES = new HashSet<>(); +// All threads that are or were at one point blocked +private static final Set BLOCKED_THREADS = new HashSet<>(); +// The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point +private static CountDownLatch awaitBlockLatch; private final String block; public static final String BLOCK_CONFIG = "block"; -private static ConfigDef config() { +public static ConfigDef config() { return new ConfigDef() .define( BLOCK_CONFIG, @@ -401,31 +412,71 @@ public class BlockingConnectorTest { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +Timer timer = Time.SYSTEM.timer(CONNECTOR_BLOCK_TIMEOUT_MS); + +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); +while (Block.awaitBlockLatch == null) { +timer.update(); +if (timer.isExpired()) { +throw new TimeoutException("Timed out waiting for connector to block."); +} +Block.class.wait(timer.remainingMs()); } +
(kafka) branch 3.6 updated: KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new fd0d43f3c42 KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) fd0d43f3c42 is described below commit fd0d43f3c422b9c3782e7f6c04d59e88e4dcee45 Author: Chris Egerton AuthorDate: Wed Jan 10 09:03:23 2024 -0500 KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) Reviewers: Sagar Rao , Greg Harris --- checkstyle/import-control.xml | 5 + .../connect/mirror/rest/MirrorRestServer.java | 23 - .../rest/resources/InternalMirrorResource.java | 11 +- .../org/apache/kafka/connect/runtime/Worker.java | 10 +- .../connect/runtime/rest/ConnectRestServer.java| 26 +++-- .../connect/runtime/rest/HerderRequestHandler.java | 16 +-- ...onnectResource.java => RestRequestTimeout.java} | 25 ++--- .../kafka/connect/runtime/rest/RestServer.java | 111 ++--- .../runtime/rest/resources/ConnectResource.java| 40 .../rest/resources/ConnectorPluginsResource.java | 19 ++-- .../runtime/rest/resources/ConnectorsResource.java | 21 ++-- .../rest/resources/InternalClusterResource.java| 12 +-- .../rest/resources/InternalConnectResource.java| 7 +- .../runtime/rest/resources/LoggingResource.java| 8 +- .../runtime/rest/resources/RootResource.java | 10 +- .../connect/integration/BlockingConnectorTest.java | 4 +- .../apache/kafka/connect/runtime/WorkerTest.java | 8 +- .../runtime/rest/ConnectRestServerTest.java| 35 --- .../resources/ConnectorPluginsResourceTest.java| 5 +- .../rest/resources/ConnectorsResourceTest.java | 16 +-- .../resources/InternalConnectResourceTest.java | 3 +- 21 files changed, 211 insertions(+), 204 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 0843deb1a30..3c3e46f49a3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -469,6 +469,9 @@ + + + @@ -482,6 +485,8 @@ + + diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index 7f1fe2841a3..a5abeff40ce 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; +import org.glassfish.hk2.api.TypeLiteral; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; import java.util.Collection; @@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer { } @Override -protected Collection regularResources() { +protected Collection> regularResources() { return Arrays.asList( -new InternalMirrorResource(herders, restClient) +InternalMirrorResource.class ); } @Override -protected Collection adminResources() { +protected Collection> adminResources() { return Collections.emptyList(); } +@Override +protected void configureRegularResources(ResourceConfig resourceConfig) { +resourceConfig.register(new Binder()); +} + +private class Binder extends AbstractBinder { +@Override +protected void configure() { +bind(herders).to(new TypeLiteral>() { }); +bind(restClient).to(RestClient.class); +} +} + } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 8b5150f56ac..5c46bd9c6c5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources; import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestCli
(kafka) branch trunk updated: KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dbf00bcf45d KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) dbf00bcf45d is described below commit dbf00bcf45d30f2c9567585f15121fe46b030d4c Author: Chris Egerton AuthorDate: Wed Jan 10 09:03:23 2024 -0500 KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149) Reviewers: Sagar Rao , Greg Harris --- checkstyle/import-control.xml | 5 + .../connect/mirror/rest/MirrorRestServer.java | 23 - .../rest/resources/InternalMirrorResource.java | 11 +- .../org/apache/kafka/connect/runtime/Worker.java | 10 +- .../connect/runtime/rest/ConnectRestServer.java| 31 -- .../connect/runtime/rest/HerderRequestHandler.java | 15 +-- ...onnectResource.java => RestRequestTimeout.java} | 25 ++--- .../kafka/connect/runtime/rest/RestServer.java | 111 ++--- .../runtime/rest/resources/ConnectResource.java| 40 .../rest/resources/ConnectorPluginsResource.java | 19 ++-- .../runtime/rest/resources/ConnectorsResource.java | 21 ++-- .../rest/resources/InternalClusterResource.java| 12 +-- .../rest/resources/InternalConnectResource.java| 7 +- .../runtime/rest/resources/LoggingResource.java| 10 +- .../runtime/rest/resources/RootResource.java | 10 +- .../connect/integration/BlockingConnectorTest.java | 4 +- .../integration/ConnectWorkerIntegrationTest.java | 2 +- .../apache/kafka/connect/runtime/WorkerTest.java | 8 +- .../runtime/rest/ConnectRestServerTest.java| 35 --- .../resources/ConnectorPluginsResourceTest.java| 5 +- .../rest/resources/ConnectorsResourceTest.java | 16 +-- .../resources/InternalConnectResourceTest.java | 3 +- 22 files changed, 219 insertions(+), 204 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 39a77326bde..b43840f5979 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -516,6 +516,9 @@ + + + @@ -530,6 +533,8 @@ + + diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index 7f1fe2841a3..a5abeff40ce 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; +import org.glassfish.hk2.api.TypeLiteral; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; import java.util.Collection; @@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer { } @Override -protected Collection regularResources() { +protected Collection> regularResources() { return Arrays.asList( -new InternalMirrorResource(herders, restClient) +InternalMirrorResource.class ); } @Override -protected Collection adminResources() { +protected Collection> adminResources() { return Collections.emptyList(); } +@Override +protected void configureRegularResources(ResourceConfig resourceConfig) { +resourceConfig.register(new Binder()); +} + +private class Binder extends AbstractBinder { +@Override +protected void configure() { +bind(herders).to(new TypeLiteral>() { }); +bind(restClient).to(RestClient.class); +} +} + } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 8b5150f56ac..5c46bd9c6c5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources; import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.runtime.Her
(kafka) branch trunk updated: KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c7e1fdca64d KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966) c7e1fdca64d is described below commit c7e1fdca64dcbd8cedf8aaf826c22566b7485dad Author: Chris Egerton AuthorDate: Tue Jan 9 10:32:39 2024 -0500 KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966) Reviewers: Sudesh Wasnik , Sagar Rao , Yash Mayya , Greg Harris --- .../ConnectorRestartApiIntegrationTest.java| 6 +- .../integration/OffsetsApiIntegrationTest.java | 464 - 2 files changed, 267 insertions(+), 203 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index 26b4eb11417..a512eeaae0a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -73,7 +73,7 @@ public class ConnectorRestartApiIntegrationTest { private static final String TOPIC_NAME = "test-topic"; -private static Map connectClusterMap = new ConcurrentHashMap<>(); +private static final Map CONNECT_CLUSTERS = new ConcurrentHashMap<>(); private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; @@ -91,7 +91,7 @@ public class ConnectorRestartApiIntegrationTest { } private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception { -connect = connectClusterMap.computeIfAbsent(numWorkers, n -> { +connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> { // setup Connect worker properties Map workerProps = new HashMap<>(); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); @@ -125,7 +125,7 @@ public class ConnectorRestartApiIntegrationTest { @AfterClass public static void close() { // stop all Connect, Kafka and Zk threads. -connectClusterMap.values().forEach(EmbeddedConnectCluster::stop); +CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index ad2a5f168ff..cc92effb778 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -30,22 +30,31 @@ import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.test.NoRetryException; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; @@ -56,6 +65,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; import static org
(kafka) branch trunk updated (9de72daa509 -> dc857fb6bf6)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 9de72daa509 KAFKA-15361: Migrating brokers must register with directory list (#14976) add dc857fb6bf6 KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes (#14304) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/common/utils/Utils.java | 42 ++ .../kafka/connect/runtime/ConnectorConfig.java | 42 +++--- .../connect/util/ConcreteSubClassValidator.java} | 26 +++- .../connect/util/InstantiableClassValidator.java | 47 ++ .../ConnectorValidationIntegrationTest.java| 161 + .../kafka/connect/runtime/ConnectorConfigTest.java | 38 ++--- 6 files changed, 307 insertions(+), 49 deletions(-) copy connect/{transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java => runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java} (54%) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java
(kafka) branch trunk updated: KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 964e73178b5 KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728) 964e73178b5 is described below commit 964e73178b5b8363cd2685ce6872905ef0c04dee Author: Matthew de Detrich AuthorDate: Thu Dec 7 16:01:17 2023 +0100 KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728) Reviewers: Mickael Maison , Chris Egerton --- build.gradle | 1 - .../runtime/standalone/StandaloneHerderTest.java | 779 + 2 files changed, 321 insertions(+), 459 deletions(-) diff --git a/build.gradle b/build.gradle index 080daf28ac9..f5e8283cdc6 100644 --- a/build.gradle +++ b/build.gradle @@ -419,7 +419,6 @@ subprojects { testsToExclude.addAll([ // connect tests "**/KafkaConfigBackingStoreTest.*", - "**/StandaloneHerderTest.*", "**/WorkerSinkTaskTest.*" ]) } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 7e4126aa141..213f029a0b2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; -import org.apache.kafka.connect.runtime.WorkerConnector; import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.rest.entities.Message; @@ -60,15 +59,13 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import java.util.ArrayList; import java.util.Arrays; @@ -88,19 +85,30 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.isNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) @SuppressWarnings("unchecked") -@PrepareForTest({StandaloneHerder.class, WorkerConnector.class}) public class StandaloneHerderTest { private static final String CONNECTOR_NAME = "test"; private static final String TOPICS_LIST_STR = "topic1,topic2"; @@ -114,7 +122,8 @@ public class StandaloneHerderTest { private StandaloneHerder herder; private Connector connector; -@Mock protected Worker
(kafka) branch 3.5 updated: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2 (#14235)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new ec5d36368bb KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2 (#14235) ec5d36368bb is described below commit ec5d36368bb37a1bf84b496b45d7501eb4702197 Author: Chris Egerton AuthorDate: Tue Dec 5 12:24:06 2023 -0500 KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2 (#14235) Reviewers: Mickael Maison --- docs/ops.html | 4 docs/upgrade.html | 9 + 2 files changed, 13 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 987912aa7f2..86b5e548aa7 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. --> + Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of. @@ -785,6 +786,9 @@ us-west->us-east.emit.heartbeats = false <li><code>sync.group.offsets.enabled</code>: whether to periodically write the translated offsets of replicated consumer groups (in the source cluster) to <code>__consumer_offsets</code> topic in target cluster, as long as no active consumers in that group are connected to the target cluster (default: false) <li><code>sync.group.offsets.interval.seconds</code>: frequency at which consumer group offsets are synced (default: 60, every minute) <li><code>offset-syncs.topic.replication.factor</code>: replication factor of MirrorMaker's internal offset-sync topics (default: 3) +<li id="georeplication-replication-policy"><code>replication.policy</code>: the <a href="/{{version}}/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html">ReplicationPolicy</a> class to use, which defaults to <a href="/{{version}}/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html">DefaultReplicationPolicy</a> +<li><code>replication.policy.separator</code>: the delimiter to use between cluster aliases and topic names, if using the <a href="/{{version}}/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html">DefaultReplicationPolicy</a> class +<li id="georeplication-replication-policy-internal-topic-separator-enabled"><code>replication.policy.internal.topic.separator.enabled</code>: whether to use <code>replication.policy.separator</code> to control the names of topics used for checkpoints and offset syncs, if using the <a href="/{{version}}/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html">DefaultReplicationPolicy</a> class. By default, custom separators are used in these topic names; however, if upgr [...] </ul> <h5 class="anchor-heading"><a id="georeplication-flow-secure" class="anchor-link"></a><a href="#georeplication-flow-secure">Securing Replication Flows</a></h5> diff --git a/docs/upgrade.html b/docs/upgrade.html index a44d63979df..3f4d66e0298 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -18,6 +18,15 @@ <script><!--#include virtual="js/templateData.js" -->
(kafka) branch 3.6 updated: KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new d67f6872892 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) d67f6872892 is described below commit d67f6872892e2a2c76f54c4baafbc5f5bccd33a7 Author: Chris Egerton AuthorDate: Mon Dec 4 16:37:37 2023 -0500 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) Reviewers: hudeqi <1217150...@qq.com>, Federico Valeri , Greg Harris --- .../kafka/connect/mirror/MirrorSourceTask.java | 33 +--- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 63 ++ .../MirrorConnectorsIntegrationBaseTest.java | 29 ++ .../ExactlyOnceSourceIntegrationTest.java | 8 +-- .../util/clusters/EmbeddedKafkaCluster.java| 13 + 5 files changed, 132 insertions(+), 14 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 84e393edb36..cad57d4ad02 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask { consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set taskTopicPartitions = config.taskTopicPartitions(); -Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); -consumer.assign(topicPartitionOffsets.keySet()); -log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() -.filter(x -> x.getValue() == 0L).count()); -log.trace("Seeking offsets: {}", topicPartitionOffsets); -topicPartitionOffsets.forEach(consumer::seek); +initializeConsumer(taskTopicPartitions); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } @@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask { private Long loadOffset(TopicPartition topicPartition) { Map wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); -return MirrorUtils.unwrapOffset(wrappedOffset) + 1; +return MirrorUtils.unwrapOffset(wrappedOffset); +} + +// visible for testing +void initializeConsumer(Set taskTopicPartitions) { +Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); +consumer.assign(topicPartitionOffsets.keySet()); +log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() +.filter(this::isUncommitted).count()); + +topicPartitionOffsets.forEach((topicPartition, offset) -> { +// Do not call seek on partitions that don't have an existing offset committed. +if (isUncommitted(offset)) { +log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); +return; +} +long nextOffsetToCommittedOffset = offset + 1L; +log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); +consumer.seek(topicPartition, nextOffsetToCommittedOffset); +}); } // visible for testing @@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask { } } +private boolean isUncommitted(Long offset) { +return offset == null || offset < 0; +} + static class PartitionState { long previousUpstreamOffset = -1L; long previousDownstreamOffset = -1L; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 0c566eb596b..647935eb356 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -31,25 +31,33 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.s
(kafka) branch trunk updated: KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a83bc2d977d KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) a83bc2d977d is described below commit a83bc2d977d2af85d4edfc8096854137481001e9 Author: Chris Egerton AuthorDate: Mon Dec 4 16:37:37 2023 -0500 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) Reviewers: hudeqi <1217150...@qq.com>, Federico Valeri , Greg Harris --- .../kafka/connect/mirror/MirrorSourceTask.java | 33 +--- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 63 ++ .../MirrorConnectorsIntegrationBaseTest.java | 29 ++ 3 files changed, 118 insertions(+), 7 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 84e393edb36..cad57d4ad02 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask { consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set taskTopicPartitions = config.taskTopicPartitions(); -Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); -consumer.assign(topicPartitionOffsets.keySet()); -log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() -.filter(x -> x.getValue() == 0L).count()); -log.trace("Seeking offsets: {}", topicPartitionOffsets); -topicPartitionOffsets.forEach(consumer::seek); +initializeConsumer(taskTopicPartitions); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } @@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask { private Long loadOffset(TopicPartition topicPartition) { Map wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); -return MirrorUtils.unwrapOffset(wrappedOffset) + 1; +return MirrorUtils.unwrapOffset(wrappedOffset); +} + +// visible for testing +void initializeConsumer(Set taskTopicPartitions) { +Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); +consumer.assign(topicPartitionOffsets.keySet()); +log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() +.filter(this::isUncommitted).count()); + +topicPartitionOffsets.forEach((topicPartition, offset) -> { +// Do not call seek on partitions that don't have an existing offset committed. +if (isUncommitted(offset)) { +log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); +return; +} +long nextOffsetToCommittedOffset = offset + 1L; +log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); +consumer.seek(topicPartition, nextOffsetToCommittedOffset); +}); } // visible for testing @@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask { } } +private boolean isUncommitted(Long offset) { +return offset == null || offset < 0; +} + static class PartitionState { long previousUpstreamOffset = -1L; long previousDownstreamOffset = -1L; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 0c566eb596b..647935eb356 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -31,25 +31,33 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageR
(kafka) branch 3.5 updated: KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new 3c8ca01cefb KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) 3c8ca01cefb is described below commit 3c8ca01cefb4bbaec787b716cd929e4e7da1b512 Author: Chris Egerton AuthorDate: Thu Nov 9 10:48:43 2023 -0500 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) Reviewers: Sagar Rao , Yash Mayya --- .../IncrementalCooperativeAssignor.java| 23 --- .../runtime/distributed/WorkerCoordinator.java | 6 ++ .../runtime/distributed/DistributedHerderTest.java | 2 +- .../IncrementalCooperativeAssignorTest.java| 77 ++ 4 files changed, 97 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index d48589423dc..0836bf2c4ba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -48,6 +48,7 @@ import java.util.stream.IntStream; import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState; import static org.apache.kafka.connect.util.ConnectUtils.combineCollections; @@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { // The complete set of connectors and tasks that should be newly-assigned during this round ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder() -.addConnectors(created.connectors()) -.addTasks(created.tasks()) -.addConnectors(lostAssignmentsToReassign.connectors()) -.addTasks(lostAssignmentsToReassign.tasks()) +.addAll(created) +.addAll(lostAssignmentsToReassign) .build(); assignConnectors(nextWorkerAssignment, toAssign.connectors()); @@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { + "missing assignments that the leader is detecting are probably due to some " + "workers failing to receive the new assignments in the previous rebalance. " + "Will reassign missing tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); +return; +} else if (maxDelay == 0) { +log.debug("Scheduled rebalance delays are disabled ({} = 0); " ++ "reassigning all lost connectors and tasks immediately", +SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG +); +lostAssignmentsToReassign.addAll(lostAssignments); return; } @@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { } } else { log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); } resetDelay(); // Resetting the flag as now we can permit successive revoking rebalances. @@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { private static void addAll(Map base, Map toAdd) { toAdd.forEach((worker, assignment) -> base .computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()) -.addConnectors(assignment.connectors()) -.addTasks(assignment.tasks()) +.addAll(assignment) ); } diff --git a/connect/
(kafka) branch 3.6 updated: KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 6b4ba0eb625 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) 6b4ba0eb625 is described below commit 6b4ba0eb6252f38e95fd25ac7b678e0a5b3f455f Author: Chris Egerton AuthorDate: Thu Nov 9 10:48:43 2023 -0500 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) Reviewers: Sagar Rao , Yash Mayya --- .../IncrementalCooperativeAssignor.java| 23 --- .../runtime/distributed/WorkerCoordinator.java | 6 ++ .../IncrementalCooperativeAssignorTest.java| 77 ++ 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index d48589423dc..0836bf2c4ba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -48,6 +48,7 @@ import java.util.stream.IntStream; import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState; import static org.apache.kafka.connect.util.ConnectUtils.combineCollections; @@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { // The complete set of connectors and tasks that should be newly-assigned during this round ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder() -.addConnectors(created.connectors()) -.addTasks(created.tasks()) -.addConnectors(lostAssignmentsToReassign.connectors()) -.addTasks(lostAssignmentsToReassign.tasks()) +.addAll(created) +.addAll(lostAssignmentsToReassign) .build(); assignConnectors(nextWorkerAssignment, toAssign.connectors()); @@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { + "missing assignments that the leader is detecting are probably due to some " + "workers failing to receive the new assignments in the previous rebalance. " + "Will reassign missing tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); +return; +} else if (maxDelay == 0) { +log.debug("Scheduled rebalance delays are disabled ({} = 0); " ++ "reassigning all lost connectors and tasks immediately", +SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG +); +lostAssignmentsToReassign.addAll(lostAssignments); return; } @@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { } } else { log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); } resetDelay(); // Resetting the flag as now we can permit successive revoking rebalances. @@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { private static void addAll(Map base, Map toAdd) { toAdd.forEach((worker, assignment) -> base .computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()) -.addConnectors(assignment.connectors()) -.addTasks(assignment.tasks()) +.addAll(assignment) ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/W
(kafka) branch trunk updated: KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 39c6170aa96 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) 39c6170aa96 is described below commit 39c6170aa96e4c9840ac469d1b43bb059f0513af Author: Chris Egerton AuthorDate: Thu Nov 9 10:48:43 2023 -0500 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) Reviewers: Sagar Rao , Yash Mayya --- .../IncrementalCooperativeAssignor.java| 23 --- .../runtime/distributed/WorkerCoordinator.java | 6 ++ .../IncrementalCooperativeAssignorTest.java| 77 ++ 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index d48589423dc..0836bf2c4ba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -48,6 +48,7 @@ import java.util.stream.IntStream; import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState; import static org.apache.kafka.connect.util.ConnectUtils.combineCollections; @@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { // The complete set of connectors and tasks that should be newly-assigned during this round ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder() -.addConnectors(created.connectors()) -.addTasks(created.tasks()) -.addConnectors(lostAssignmentsToReassign.connectors()) -.addTasks(lostAssignmentsToReassign.tasks()) +.addAll(created) +.addAll(lostAssignmentsToReassign) .build(); assignConnectors(nextWorkerAssignment, toAssign.connectors()); @@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { + "missing assignments that the leader is detecting are probably due to some " + "workers failing to receive the new assignments in the previous rebalance. " + "Will reassign missing tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); +return; +} else if (maxDelay == 0) { +log.debug("Scheduled rebalance delays are disabled ({} = 0); " ++ "reassigning all lost connectors and tasks immediately", +SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG +); +lostAssignmentsToReassign.addAll(lostAssignments); return; } @@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { } } else { log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); -lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); +lostAssignmentsToReassign.addAll(lostAssignments); } resetDelay(); // Resetting the flag as now we can permit successive revoking rebalances. @@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { private static void addAll(Map base, Map toAdd) { toAdd.forEach((worker, assignment) -> base .computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()) -.addConnectors(assignment.connectors()) -.addTasks(assignment.tasks()) +.addAll(assignment) ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distri
(kafka) branch 3.6 updated: KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 264e983f215 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) 264e983f215 is described below commit 264e983f215342471dce7fda66642efe4d049347 Author: kumarpritam863 <148938310+kumarpritam...@users.noreply.github.com> AuthorDate: Mon Nov 6 22:02:05 2023 +0530 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) Reviewers: Chris Egerton --- .../kafka/clients/consumer/MockConsumer.java | 6 ++-- .../kafka/clients/consumer/MockConsumerTest.java | 3 ++ .../kafka/connect/runtime/WorkerSinkTask.java | 18 +++ .../connect/runtime/WorkerSinkTaskContext.java | 6 ++-- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 36 +++--- 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index df6a34737e6..ce112c82a16 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -104,15 +104,13 @@ public class MockConsumer implements Consumer { // rebalance this.records.clear(); -this.subscriptions.assignFromSubscribed(newAssignment); // rebalance callbacks -if (!added.isEmpty()) { -this.subscriptions.rebalanceListener().onPartitionsAssigned(added); -} if (!removed.isEmpty()) { this.subscriptions.rebalanceListener().onPartitionsRevoked(removed); } +this.subscriptions.assignFromSubscribed(newAssignment); +this.subscriptions.rebalanceListener().onPartitionsAssigned(added); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 17247ba79b1..c03be92fb9a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -154,6 +154,9 @@ public class MockConsumerTest { @Override public void onPartitionsAssigned(Collection partitions) { +if (partitions.isEmpty()) { +return; +} assigned.clear(); assigned.addAll(partitions); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 7b514016c37..56181bd27de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -85,7 +85,7 @@ class WorkerSinkTask extends WorkerTask { private final TransformationChain transformationChain; private final SinkTaskMetricsGroup sinkTaskMetricsGroup; private final boolean isTopicTrackingEnabled; -private final KafkaConsumer consumer; +private final Consumer consumer; private WorkerSinkTaskContext context; private final List messageBatch; private final Map lastCommittedOffsets; @@ -114,7 +114,7 @@ class WorkerSinkTask extends WorkerTask { ErrorHandlingMetrics errorMetrics, HeaderConverter headerConverter, TransformationChain transformationChain, - KafkaConsumer consumer, + Consumer consumer, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, @@ -184,6 +184,14 @@ class WorkerSinkTask extends WorkerTask { Utils.closeQuietly(transformationChain, "transformation chain"); Ut
(kafka) branch trunk updated: KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ca05da95c33 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) ca05da95c33 is described below commit ca05da95c337f051e721464588d1b6678e8f2afb Author: kumarpritam863 <148938310+kumarpritam...@users.noreply.github.com> AuthorDate: Mon Nov 6 22:02:05 2023 +0530 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) Reviewers: Chris Egerton --- .../kafka/clients/consumer/MockConsumer.java | 6 ++-- .../kafka/clients/consumer/MockConsumerTest.java | 3 ++ .../kafka/connect/runtime/WorkerSinkTask.java | 18 +++ .../connect/runtime/WorkerSinkTaskContext.java | 6 ++-- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 36 +++--- 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 53c13e4b98b..126d5eff02a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -104,15 +104,13 @@ public class MockConsumer implements Consumer { // rebalance this.records.clear(); -this.subscriptions.assignFromSubscribed(newAssignment); // rebalance callbacks -if (!added.isEmpty()) { -this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added)); -} if (!removed.isEmpty()) { this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsRevoked(removed)); } +this.subscriptions.assignFromSubscribed(newAssignment); +this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 17247ba79b1..c03be92fb9a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -154,6 +154,9 @@ public class MockConsumerTest { @Override public void onPartitionsAssigned(Collection partitions) { +if (partitions.isEmpty()) { +return; +} assigned.clear(); assigned.addAll(partitions); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 4d400260081..0f2e8f6eb27 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -85,7 +85,7 @@ class WorkerSinkTask extends WorkerTask { private final TransformationChain transformationChain; private final SinkTaskMetricsGroup sinkTaskMetricsGroup; private final boolean isTopicTrackingEnabled; -private final KafkaConsumer consumer; +private final Consumer consumer; private WorkerSinkTaskContext context; private final List messageBatch; private final Map lastCommittedOffsets; @@ -114,7 +114,7 @@ class WorkerSinkTask extends WorkerTask { ErrorHandlingMetrics errorMetrics, HeaderConverter headerConverter, TransformationChain transformationChain, - KafkaConsumer consumer, + Consumer consumer, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, @@ -184,6 +184,14 @@ class WorkerSinkTask extends WorkerTask { Utils.closeQuietly(tran
[kafka] branch 3.5 updated: KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new add9dc3340f KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) add9dc3340f is described below commit add9dc3340fde5c4bd74b1459e89309f51c233a0 Author: hudeqi <1217150...@qq.com> AuthorDate: Sat Oct 21 00:17:51 2023 +0800 KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) Reviewers: Chris Egerton --- .../kafka/connect/mirror/MirrorCheckpointTask.java | 19 +++ .../connect/mirror/MirrorCheckpointTaskTest.java | 27 ++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index a51846766a6..0bf2baa9d85 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -330,11 +330,20 @@ public class MirrorCheckpointTask extends SourceTask { // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition -long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); -if (latestDownstreamOffset >= convertedOffset.offset()) { -log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " -+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); -continue; +OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition); +if (targetOffsetAndMetadata != null) { +long latestDownstreamOffset = targetOffsetAndMetadata.offset(); +if (latestDownstreamOffset >= convertedOffset.offset()) { +log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); +continue; +} +} else { +// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted. +// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause +// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target. +log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.", +consumerGroupId, topicPartition); } offsetToSync.put(topicPartition, convertedOffset); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index f9bc7bc76cb..8d8e8bd3a0d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -169,6 +169,33 @@ public class MirrorCheckpointTaskTest { "Consumer 2 " + topic2 + " failed"); } +@Test +public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { +Map> idleConsumerGroupsOffset = new HashMap<>(); +Map> checkpointsPerConsumerGroup = new HashMap<>(); + +String consumer = "consumer"; +String topic = "topic"; +Map ct = new HashMap<>(); +TopicPartition tp = new TopicPartition(topic, 0); +// Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause +// the obtained `OffsetAndMetadata` of the target cluster to be null. +ct.put(tp, null); +idleConsumerGroupsOffset.put(consumer, ct); + +Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata"); +Map checkpointMap = new HashMap<>(); +checkpointMap.put(cp.topicPartition(), cp); +checkpointsPerConsumerGroup.put(consumer, checkpointMap); + +MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask(&quo
[kafka] branch 3.6 updated: KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 9fca00871ab KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) 9fca00871ab is described below commit 9fca00871ab2c218b797ef44213d80a3dc1f2098 Author: hudeqi <1217150...@qq.com> AuthorDate: Sat Oct 21 00:17:51 2023 +0800 KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) Reviewers: Chris Egerton --- .../kafka/connect/mirror/MirrorCheckpointTask.java | 19 +++ .../connect/mirror/MirrorCheckpointTaskTest.java | 27 ++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index a51846766a6..0bf2baa9d85 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -330,11 +330,20 @@ public class MirrorCheckpointTask extends SourceTask { // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition -long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); -if (latestDownstreamOffset >= convertedOffset.offset()) { -log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " -+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); -continue; +OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition); +if (targetOffsetAndMetadata != null) { +long latestDownstreamOffset = targetOffsetAndMetadata.offset(); +if (latestDownstreamOffset >= convertedOffset.offset()) { +log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); +continue; +} +} else { +// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted. +// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause +// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target. +log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.", +consumerGroupId, topicPartition); } offsetToSync.put(topicPartition, convertedOffset); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index f9bc7bc76cb..8d8e8bd3a0d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -169,6 +169,33 @@ public class MirrorCheckpointTaskTest { "Consumer 2 " + topic2 + " failed"); } +@Test +public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { +Map> idleConsumerGroupsOffset = new HashMap<>(); +Map> checkpointsPerConsumerGroup = new HashMap<>(); + +String consumer = "consumer"; +String topic = "topic"; +Map ct = new HashMap<>(); +TopicPartition tp = new TopicPartition(topic, 0); +// Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause +// the obtained `OffsetAndMetadata` of the target cluster to be null. +ct.put(tp, null); +idleConsumerGroupsOffset.put(consumer, ct); + +Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata"); +Map checkpointMap = new HashMap<>(); +checkpointMap.put(cp.topicPartition(), cp); +checkpointsPerConsumerGroup.put(consumer, checkpointMap); + +MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask(&quo
[kafka] branch trunk updated: KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4083cd627e9 KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) 4083cd627e9 is described below commit 4083cd627e9b62a979d1f3b312b6bacbbbee822c Author: hudeqi <1217150...@qq.com> AuthorDate: Sat Oct 21 00:17:51 2023 +0800 KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587) Reviewers: Chris Egerton --- .../kafka/connect/mirror/MirrorCheckpointTask.java | 19 +++ .../connect/mirror/MirrorCheckpointTaskTest.java | 27 ++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index a51846766a6..0bf2baa9d85 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -330,11 +330,20 @@ public class MirrorCheckpointTask extends SourceTask { // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition -long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); -if (latestDownstreamOffset >= convertedOffset.offset()) { -log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " -+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); -continue; +OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition); +if (targetOffsetAndMetadata != null) { +long latestDownstreamOffset = targetOffsetAndMetadata.offset(); +if (latestDownstreamOffset >= convertedOffset.offset()) { +log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); +continue; +} +} else { +// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted. +// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause +// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target. +log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.", +consumerGroupId, topicPartition); } offsetToSync.put(topicPartition, convertedOffset); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index f9bc7bc76cb..8d8e8bd3a0d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -169,6 +169,33 @@ public class MirrorCheckpointTaskTest { "Consumer 2 " + topic2 + " failed"); } +@Test +public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { +Map> idleConsumerGroupsOffset = new HashMap<>(); +Map> checkpointsPerConsumerGroup = new HashMap<>(); + +String consumer = "consumer"; +String topic = "topic"; +Map ct = new HashMap<>(); +TopicPartition tp = new TopicPartition(topic, 0); +// Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause +// the obtained `OffsetAndMetadata` of the target cluster to be null. +ct.put(tp, null); +idleConsumerGroupsOffset.put(consumer, ct); + +Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata"); +Map checkpointMap = new HashMap<>(); +checkpointMap.put(cp.topicPartition(), cp); +checkpointsPerConsumerGroup.put(consumer, checkpointMap); + +MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask(&quo
[kafka] branch trunk updated: KAFKA-15473: Hide duplicate plugins in /connector-plugins (#14398)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b088307612b KAFKA-15473: Hide duplicate plugins in /connector-plugins (#14398) b088307612b is described below commit b088307612b59d9864fb7e3096dc9a0b47d7273d Author: Greg Harris AuthorDate: Tue Sep 19 09:37:21 2023 -0700 KAFKA-15473: Hide duplicate plugins in /connector-plugins (#14398) Reviewers: Yash Mayya , Sagar Rao , Hector Geraldino , Chris Egerton --- .../rest/resources/ConnectorPluginsResource.java| 8 +--- .../resources/ConnectorPluginsResourceTest.java | 21 ++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 947c467ae1a..037d98b68e6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -42,8 +42,10 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -55,12 +57,12 @@ public class ConnectorPluginsResource implements ConnectResource { private static final String ALIAS_SUFFIX = "Connector"; private final Herder herder; -private final List connectorPlugins; +private final Set connectorPlugins; private long requestTimeoutMs; public ConnectorPluginsResource(Herder herder) { this.herder = herder; -this.connectorPlugins = new ArrayList<>(); +this.connectorPlugins = new LinkedHashSet<>(); this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS; // TODO: improve once plugins are allowed to be added/removed during runtime. @@ -126,7 +128,7 @@ public class ConnectorPluginsResource implements ConnectResource { .filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type())) .collect(Collectors.toList())); } else { -return Collections.unmodifiableList(connectorPlugins); +return Collections.unmodifiableList(new ArrayList<>(connectorPlugins)); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index c39017adc40..52ac14ca1cd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.SampleSinkConnector; import org.apache.kafka.connect.runtime.SampleSourceConnector; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -62,9 +64,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.junit.MockitoJUnitRunner; import javax.ws.rs.BadRequestException; +import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -120,6 +124,7 @@ public class ConnectorPluginsResourceTest { static { try { ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); +ClassLoader pluginClassLoader = new PluginClassLoader(DelegatingClassLoaderTest.ARBITRARY_URL, new URL[]{}, classLoader); String appVersion = AppInfoParser.getVersion(); SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader)); SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion,
[kafka] branch trunk updated: KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371) 7872a1ff5b2 is described below commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f Author: Yash Mayya AuthorDate: Tue Sep 19 16:39:39 2023 +0100 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371) Reviewers: Sagar Rao , Chris Egerton --- .../integration/ConnectWorkerIntegrationTest.java | 8 +++ .../ConnectorTopicsIntegrationTest.java| 8 +++ .../integration/ErrorHandlingIntegrationTest.java | 8 +++ .../RebalanceSourceConnectorsIntegrationTest.java | 4 ++-- .../clusters/EmbeddedConnectClusterAssertions.java | 26 +- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 2e843cd6ec6..4c393d95ad3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest { // Delete the connector connect.deleteConnector(CONNECTOR_NAME); -connect.assertions().assertConnectorAndTasksAreNotRunning( +connect.assertions().assertConnectorDoesNotExist( CONNECTOR_NAME, -"Connector tasks were not destroyed in time" +"Connector wasn't deleted in time" ); } @@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest { // Can delete a stopped connector connect.deleteConnector(CONNECTOR_NAME); -connect.assertions().assertConnectorAndTasksAreNotRunning( +connect.assertions().assertConnectorDoesNotExist( CONNECTOR_NAME, -"Connector and all of its tasks should no longer be running" +"Connector wasn't deleted in time" ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index a8b812f8c31..0614ba8a9f7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest { // deleting a connector resets its active topics connect.deleteConnector(BAR_CONNECTOR); - connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR, -"Connector tasks did not stop in time."); +connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR, +"Connector wasn't deleted in time."); connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: " + BAR_CONNECTOR); @@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest { // deleting a connector resets its active topics connect.deleteConnector(FOO_CONNECTOR); - connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR, -"Connector tasks did not stop in time."); +connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR, +"Connector wasn't deleted in time."); connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for deleted connector: " + FOO_CONNECTOR); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 7d3c1d6924b..55479e6d4ff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest { } connect.deleteConnector(CONNECTOR_NAME); - connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME, -
[kafka] branch 3.6 updated: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 0db8e8c5f2e KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313) 0db8e8c5f2e is described below commit 0db8e8c5f2e97577d71cea24ad5d605124850ba9 Author: Chris Egerton AuthorDate: Thu Sep 7 19:24:17 2023 -0400 KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313) Reviewers: Philip Nee , Greg Harris --- .../kafka/connect/mirror/OffsetSyncStore.java | 3 +- .../org/apache/kafka/connect/util/TopicAdmin.java | 32 +++--- .../apache/kafka/connect/util/TopicAdminTest.java | 107 +++-- 3 files changed, 79 insertions(+), 63 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 63a91a11b45..7ba3deaad29 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; @@ -75,7 +74,7 @@ class OffsetSyncStore implements AutoCloseable { try { consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); admin = new TopicAdmin( - config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), +config.offsetSyncsTopicAdminConfig(), config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())); store = createBackingStore(config, consumer, admin); } catch (Throwable t) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index f8c30dc13b3..3db23d9e909 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -286,20 +286,30 @@ public class TopicAdmin implements AutoCloseable { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); +this(adminConfig, Admin.create(adminConfig)); } -public TopicAdmin(Object bootstrapServers, Admin adminClient) { -this(bootstrapServers, adminClient, true); +public TopicAdmin(Map adminConfig, Admin adminClient) { +this(bootstrapServers(adminConfig), adminClient, true); } // visible for testing -TopicAdmin(Object bootstrapServers, Admin adminClient, boolean logCreation) { +TopicAdmin(Admin adminClient) { +this(null, adminClient, true); +} + +// visible for testing +TopicAdmin(String bootstrapServers, Admin adminClient, boolean logCreation) { this.admin = adminClient; -this.bootstrapServers = bootstrapServers != null ? bootstrapServers.toString() : ""; +this.bootstrapServers = bootstrapServers != null ? bootstrapServers : ""; this.logCreation = logCreation; } +private static String bootstrapServers(Map adminConfig) { +Object result = adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); +return result != null ? result.toString() : null; +} + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. @@ -720,23 +730,23 @@ public class TopicAdmin implements AutoCloseable { String topic = partition.topic(); if (cause instanceof AuthorizationException) { String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers); -throw new ConnectException(msg, e); +throw new ConnectException(msg, cause); } else if (cause instanceof UnsupportedVersionException) { // Should theoretically never happen, because this method is the same as what the consumer uses and therefore // should exist in the broker since before the admin client was added String msg = String.format("API to get the get the end offsets for t
[kafka] branch trunk updated: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 54ab5b29e47 KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313) 54ab5b29e47 is described below commit 54ab5b29e47fd3f399e0575df2f490a33f12804b Author: Chris Egerton AuthorDate: Thu Sep 7 19:24:17 2023 -0400 KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case (#14313) Reviewers: Philip Nee , Greg Harris --- .../kafka/connect/mirror/OffsetSyncStore.java | 3 +- .../org/apache/kafka/connect/util/TopicAdmin.java | 32 +++--- .../apache/kafka/connect/util/TopicAdminTest.java | 107 +++-- 3 files changed, 79 insertions(+), 63 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 63a91a11b45..7ba3deaad29 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; @@ -75,7 +74,7 @@ class OffsetSyncStore implements AutoCloseable { try { consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); admin = new TopicAdmin( - config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), +config.offsetSyncsTopicAdminConfig(), config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())); store = createBackingStore(config, consumer, admin); } catch (Throwable t) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index f8c30dc13b3..3db23d9e909 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -286,20 +286,30 @@ public class TopicAdmin implements AutoCloseable { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); +this(adminConfig, Admin.create(adminConfig)); } -public TopicAdmin(Object bootstrapServers, Admin adminClient) { -this(bootstrapServers, adminClient, true); +public TopicAdmin(Map adminConfig, Admin adminClient) { +this(bootstrapServers(adminConfig), adminClient, true); } // visible for testing -TopicAdmin(Object bootstrapServers, Admin adminClient, boolean logCreation) { +TopicAdmin(Admin adminClient) { +this(null, adminClient, true); +} + +// visible for testing +TopicAdmin(String bootstrapServers, Admin adminClient, boolean logCreation) { this.admin = adminClient; -this.bootstrapServers = bootstrapServers != null ? bootstrapServers.toString() : ""; +this.bootstrapServers = bootstrapServers != null ? bootstrapServers : ""; this.logCreation = logCreation; } +private static String bootstrapServers(Map adminConfig) { +Object result = adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); +return result != null ? result.toString() : null; +} + /** * Attempt to create the topic described by the given definition, returning true if the topic was created or false * if the topic already existed. @@ -720,23 +730,23 @@ public class TopicAdmin implements AutoCloseable { String topic = partition.topic(); if (cause instanceof AuthorizationException) { String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers); -throw new ConnectException(msg, e); +throw new ConnectException(msg, cause); } else if (cause instanceof UnsupportedVersionException) { // Should theoretically never happen, because this method is the same as what the consumer uses and therefore // should exist in the broker since before the admin client was added String msg = String.format("API to get the get the end of
[kafka-site] branch asf-site updated: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#539)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 77ecf44c KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#539) 77ecf44c is described below commit 77ecf44cc39788f14700547fc182b63aa7b8aa9b Author: Yash Mayya AuthorDate: Thu Sep 7 17:39:01 2023 +0100 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#539) Reviewers: Chris Egerton --- 35/connect.html | 5 + 1 file changed, 5 insertions(+) diff --git a/35/connect.html b/35/connect.html index 20872b13..976fda0b 100644 --- a/35/connect.html +++ b/35/connect.html @@ -934,6 +934,7 @@ Struct struct = new Struct(schema) UNASSIGNED: The connector/task has not yet been assigned to a worker. RUNNING: The connector/task is running. PAUSED: The connector/task has been administratively paused. +STOPPED: The connector has been stopped. Note that this state is not applicable to tasks because the tasks for a stopped connector are shut down and won't be visible in the status API. FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output). RESTARTING: The connector/task is either actively restarting or is expected to restart soon @@ -949,6 +950,10 @@ Struct struct = new Struct(schema) It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. T [...] + + +In 3.5.0, Connect introduced a stop API that completely shuts down the tasks for a connector and deallocates any resources claimed by them. This is different from pausing a connector where tasks are left idling and any resources claimed by them are left allocated (which allows the connector to begin processing data quickly once it is resumed). Stopping a connector is more efficient from a resource usage standpoint than pausing it, but can cause it to take longer to begin processing d [...] +
[kafka] branch trunk updated: KAFKA-15179: Add integration tests for the file sink and source connectors (#14279)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 88b554fdbd4 KAFKA-15179: Add integration tests for the file sink and source connectors (#14279) 88b554fdbd4 is described below commit 88b554fdbd47d4820059bc3633f5b2e3d2adb12d Author: Yash Mayya AuthorDate: Thu Sep 7 17:24:13 2023 +0100 KAFKA-15179: Add integration tests for the file sink and source connectors (#14279) Reviewers: Ashwin Pankaj , Chris Egerton --- build.gradle | 4 + .../FileStreamSinkConnectorIntegrationTest.java| 192 ++ .../FileStreamSourceConnectorIntegrationTest.java | 214 + connect/file/src/test/resources/log4j.properties | 28 +++ .../ExactlyOnceSourceIntegrationTest.java | 8 +- .../util/clusters/EmbeddedConnectCluster.java | 40 .../util/clusters/EmbeddedKafkaCluster.java| 13 ++ 7 files changed, 492 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index b332932c63b..20d1328df59 100644 --- a/build.gradle +++ b/build.gradle @@ -2915,6 +2915,10 @@ project(':connect:file') { testRuntimeOnly libs.slf4jlog4j testImplementation project(':clients').sourceSets.test.output +testImplementation project(':connect:runtime') +testImplementation project(':connect:runtime').sourceSets.test.output +testImplementation project(':core') +testImplementation project(':core').sourceSets.test.output } javadoc { diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java new file mode 100644 index 000..433c2004710 --- /dev/null +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_PREFIX = "Message "; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +
[kafka] branch 3.5 updated: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new b8cf3e31747 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) b8cf3e31747 is described below commit b8cf3e31747f7193024c36f3381c0dd5bd22158c Author: Chris Egerton AuthorDate: Tue Sep 5 14:46:37 2023 -0400 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) Original author (before modifications for backporting: Yash Mayya Reviewers: Chris Egerton --- docs/connect.html | 5 + 1 file changed, 5 insertions(+) diff --git a/docs/connect.html b/docs/connect.html index 5a053e4a0dc..40213ad3003 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -934,6 +934,7 @@ Struct struct = new Struct(schema) UNASSIGNED: The connector/task has not yet been assigned to a worker. RUNNING: The connector/task is running. PAUSED: The connector/task has been administratively paused. +STOPPED: The connector has been stopped. Note that this state is not applicable to tasks because the tasks for a stopped connector are shut down and won't be visible in the status API. FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output). RESTARTING: The connector/task is either actively restarting or is expected to restart soon @@ -949,6 +950,10 @@ Struct struct = new Struct(schema) It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. T [...] + + +In 3.5.0, Connect introduced a stop API that completely shuts down the tasks for a connector and deallocates any resources claimed by them. This is different from pausing a connector where tasks are left idling and any resources claimed by them are left allocated (which allows the connector to begin processing data quickly once it is resumed). Stopping a connector is more efficient from a resource usage standpoint than pausing it, but can cause it to take longer to begin processing d [...] +
[kafka] branch 3.6 updated: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 4f855576e69 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) 4f855576e69 is described below commit 4f855576e697d146a3bfa2256ffd807c419753bc Author: Yash Mayya AuthorDate: Tue Sep 5 19:39:49 2023 +0100 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) Reviewers: Chris Egerton --- docs/connect.html | 5 + 1 file changed, 5 insertions(+) diff --git a/docs/connect.html b/docs/connect.html index aecaccab3af..2deb8901888 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -1065,6 +1065,7 @@ Struct struct = new Struct(schema) UNASSIGNED: The connector/task has not yet been assigned to a worker. RUNNING: The connector/task is running. PAUSED: The connector/task has been administratively paused. +STOPPED: The connector has been stopped. Note that this state is not applicable to tasks because the tasks for a stopped connector are shut down and won't be visible in the status API. FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output). RESTARTING: The connector/task is either actively restarting or is expected to restart soon @@ -1080,6 +1081,10 @@ Struct struct = new Struct(schema) It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. T [...] + + +In 3.5.0, Connect introduced a stop API that completely shuts down the tasks for a connector and deallocates any resources claimed by them. This is different from pausing a connector where tasks are left idling and any resources claimed by them are left allocated (which allows the connector to begin processing data quickly once it is resumed). Stopping a connector is more efficient from a resource usage standpoint than pausing it, but can cause it to take longer to begin processing d [...] +
[kafka-site] branch asf-site updated: MINOR: Add missing entries for Kafka Connect to the documentation's table of contents (#538)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new f3f9 MINOR: Add missing entries for Kafka Connect to the documentation's table of contents (#538) f3f9 is described below commit f3f99cd38831be1db8f5e821194eb810164f Author: Yash Mayya AuthorDate: Tue Sep 5 19:41:48 2023 +0100 MINOR: Add missing entries for Kafka Connect to the documentation's table of contents (#538) Reviewers: Chris Egerton --- 35/connect.html | 4 ++-- 35/toc.html | 8 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/35/connect.html b/35/connect.html index e397aa79..20872b13 100644 --- a/35/connect.html +++ b/35/connect.html @@ -822,7 +822,7 @@ if (inputsChanged()) SinkConnectors usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. SinkTasks should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handl [...] -Connect Configuration Validation +Configuration Validation Kafka Connect allows you to validate connector configurations before submitting a connector to be executed and can provide feedback about errors and recommended values. To take advantage of this, connector developers need to provide an implementation of config() to expose the configuration definition to the framework. @@ -868,7 +868,7 @@ Struct struct = new Struct(schema) Sink connectors are usually simpler because they are consuming data and therefore do not need to create schemas. However, they should take just as much care to validate that the schemas they receive have the expected format. When the schema does not match -- usually indicating the upstream producer is generating invalid data that cannot be correctly translated to the destination system -- sink connectors should throw an exception to indicate this error to the system. -Kafka Connect Administration +8.4 Administration Kafka Connect's REST layer provides a set of APIs to enable administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks). diff --git a/35/toc.html b/35/toc.html index ce444871..93fcbcd7 100644 --- a/35/toc.html +++ b/35/toc.html @@ -203,6 +203,14 @@ Exactly-once support 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Configuration Validation +Working with Schemas + +8.4 Administration 9. Kafka Streams
[kafka] branch trunk updated: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1f473ebb5ea KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) 1f473ebb5ea is described below commit 1f473ebb5ea9ad4ebdfdc99051864cce6d80db87 Author: Yash Mayya AuthorDate: Tue Sep 5 19:39:49 2023 +0100 KAFKA-14876: Add stopped state to Kafka Connect Administration docs section (#14336) Reviewers: Chris Egerton --- docs/connect.html | 5 + 1 file changed, 5 insertions(+) diff --git a/docs/connect.html b/docs/connect.html index aecaccab3af..2deb8901888 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -1065,6 +1065,7 @@ Struct struct = new Struct(schema) UNASSIGNED: The connector/task has not yet been assigned to a worker. RUNNING: The connector/task is running. PAUSED: The connector/task has been administratively paused. +STOPPED: The connector has been stopped. Note that this state is not applicable to tasks because the tasks for a stopped connector are shut down and won't be visible in the status API. FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output). RESTARTING: The connector/task is either actively restarting or is expected to restart soon @@ -1080,6 +1081,10 @@ Struct struct = new Struct(schema) It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. T [...] + + +In 3.5.0, Connect introduced a stop API that completely shuts down the tasks for a connector and deallocates any resources claimed by them. This is different from pausing a connector where tasks are left idling and any resources claimed by them are left allocated (which allows the connector to begin processing data quickly once it is resumed). Stopping a connector is more efficient from a resource usage standpoint than pausing it, but can cause it to take longer to begin processing d [...] +
[kafka] branch 3.5 updated: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new fb85e9d4aaf MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) fb85e9d4aaf is described below commit fb85e9d4aafa3cc079d9fd7a4e0ac751ab9ac088 Author: Yash Mayya AuthorDate: Tue Sep 5 18:58:44 2023 +0100 MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) Reviewers: Chris Egerton --- docs/connect.html | 4 ++-- docs/toc.html | 8 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/connect.html b/docs/connect.html index 32fd479b0de..5a053e4a0dc 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -822,7 +822,7 @@ if (inputsChanged()) SinkConnectors usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. SinkTasks should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handl [...] -Connect Configuration Validation +Configuration Validation Kafka Connect allows you to validate connector configurations before submitting a connector to be executed and can provide feedback about errors and recommended values. To take advantage of this, connector developers need to provide an implementation of config() to expose the configuration definition to the framework. @@ -868,7 +868,7 @@ Struct struct = new Struct(schema) Sink connectors are usually simpler because they are consuming data and therefore do not need to create schemas. However, they should take just as much care to validate that the schemas they receive have the expected format. When the schema does not match -- usually indicating the upstream producer is generating invalid data that cannot be correctly translated to the destination system -- sink connectors should throw an exception to indicate this error to the system. -Kafka Connect Administration +8.4 Administration Kafka Connect's REST layer provides a set of APIs to enable administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks). diff --git a/docs/toc.html b/docs/toc.html index 514e98bbb35..c0c991ab2e5 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -203,6 +203,14 @@ Exactly-once support 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Configuration Validation +Working with Schemas + +8.4 Administration 9. Kafka Streams
[kafka] branch 3.6 updated: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 3c50c382afd MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) 3c50c382afd is described below commit 3c50c382afd0ade5228069c3e9518c469f3ca3b7 Author: Yash Mayya AuthorDate: Tue Sep 5 18:58:44 2023 +0100 MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) Reviewers: Chris Egerton --- docs/connect.html | 4 ++-- docs/toc.html | 8 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/connect.html b/docs/connect.html index d324f4384b4..aecaccab3af 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -953,7 +953,7 @@ if (inputsChanged()) SinkConnectors usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. SinkTasks should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handl [...] -Connect Configuration Validation +Configuration Validation Kafka Connect allows you to validate connector configurations before submitting a connector to be executed and can provide feedback about errors and recommended values. To take advantage of this, connector developers need to provide an implementation of config() to expose the configuration definition to the framework. @@ -999,7 +999,7 @@ Struct struct = new Struct(schema) Sink connectors are usually simpler because they are consuming data and therefore do not need to create schemas. However, they should take just as much care to validate that the schemas they receive have the expected format. When the schema does not match -- usually indicating the upstream producer is generating invalid data that cannot be correctly translated to the destination system -- sink connectors should throw an exception to indicate this error to the system. -Kafka Connect Administration +8.4 Administration Kafka Connect's REST layer provides a set of APIs to enable administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks). diff --git a/docs/toc.html b/docs/toc.html index 4b8bf3d35e7..88dd62c92dd 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -207,6 +207,14 @@ Plugin Discovery 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Configuration Validation +Working with Schemas + +8.4 Administration 9. Kafka Streams
[kafka] branch trunk updated: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 79598b49d6f MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) 79598b49d6f is described below commit 79598b49d6fff9bef686500f46a288b61a9013fd Author: Yash Mayya AuthorDate: Tue Sep 5 18:58:44 2023 +0100 MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect (#14337) Reviewers: Chris Egerton --- docs/connect.html | 4 ++-- docs/toc.html | 8 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/connect.html b/docs/connect.html index d324f4384b4..aecaccab3af 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -953,7 +953,7 @@ if (inputsChanged()) SinkConnectors usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. SinkTasks should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handl [...] -Connect Configuration Validation +Configuration Validation Kafka Connect allows you to validate connector configurations before submitting a connector to be executed and can provide feedback about errors and recommended values. To take advantage of this, connector developers need to provide an implementation of config() to expose the configuration definition to the framework. @@ -999,7 +999,7 @@ Struct struct = new Struct(schema) Sink connectors are usually simpler because they are consuming data and therefore do not need to create schemas. However, they should take just as much care to validate that the schemas they receive have the expected format. When the schema does not match -- usually indicating the upstream producer is generating invalid data that cannot be correctly translated to the destination system -- sink connectors should throw an exception to indicate this error to the system. -Kafka Connect Administration +8.4 Administration Kafka Connect's REST layer provides a set of APIs to enable administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks). diff --git a/docs/toc.html b/docs/toc.html index 4b8bf3d35e7..88dd62c92dd 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -207,6 +207,14 @@ Plugin Discovery 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Configuration Validation +Working with Schemas + +8.4 Administration 9. Kafka Streams
[kafka] branch trunk updated (8611d28b2e2 -> 1c5020e1429)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8611d28b2e2 KAFKA-15392: Prevent shadowing RestServer shutdown exceptions (#14277) add 1c5020e1429 KAFKA-13327: Gracefully report connector validation errors instead of returning 500 responses (#14303) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/AbstractHerder.java | 5 +- .../kafka/connect/runtime/ConnectorConfig.java | 5 +- .../kafka/connect/runtime/SinkConnectorConfig.java | 74 +++-- .../runtime/distributed/DistributedHerder.java | 29 +- .../ConnectorValidationIntegrationTest.java| 336 + .../kafka/connect/runtime/AbstractHerderTest.java | 27 +- .../runtime/distributed/DistributedHerderTest.java | 27 ++ .../clusters/EmbeddedConnectClusterAssertions.java | 22 +- 8 files changed, 488 insertions(+), 37 deletions(-) create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
[kafka] branch trunk updated: MINOR: Allow writing tombstone offsets for arbitrary partitions in the FileStreamSourceConnector (#14234)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7802c264c96 MINOR: Allow writing tombstone offsets for arbitrary partitions in the FileStreamSourceConnector (#14234) 7802c264c96 is described below commit 7802c264c96ae27167cf38c263b86398aa0ea3fe Author: Yash Mayya AuthorDate: Thu Aug 17 19:13:53 2023 +0100 MINOR: Allow writing tombstone offsets for arbitrary partitions in the FileStreamSourceConnector (#14234) Reviewers: Chris Egerton --- .../connect/file/FileStreamSourceConnector.java | 21 +++-- .../connect/file/FileStreamSourceConnectorTest.java | 14 ++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 13193f8f501..37cdcec1b05 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -117,18 +117,10 @@ public class FileStreamSourceConnector extends SourceConnector { // This connector makes use of a single source partition at a time which represents the file that it is configured to read from. // However, there could also be source partitions from previous configurations of the connector. for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { -Map partition = partitionOffset.getKey(); -if (partition == null) { -throw new ConnectException("Partition objects cannot be null"); -} - -if (!partition.containsKey(FILENAME_FIELD)) { -throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); -} - Map offset = partitionOffset.getValue(); -// null offsets are allowed and represent a deletion of offsets for a partition if (offset == null) { +// We allow tombstones for anything; if there's garbage in the offsets for the connector, we don't +// want to prevent users from being able to clean it up using the REST API continue; } @@ -145,6 +137,15 @@ public class FileStreamSourceConnector extends SourceConnector { if (offsetPosition < 0) { throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value"); } + +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} } // Let the task check whether the actual value for the offset position is valid for the configured file on startup diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index 185faa80eb3..41915913b03 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -227,4 +227,18 @@ public class FileStreamSourceConnectorTest { assertTrue(connector.alterOffsets(sourceProperties, offsets)); assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>())); } + +@Test +public void testAlterOffsetsTombstones() { +Function, Boolean> alterOffsets = partition -> connector.alterOffsets( +sourceProperties, +Collections.singletonMap(partition, null) +); + +assertTrue(alterOffsets.apply(null)); +assertTrue(alterOffsets.apply(Collections.emptyMap())); +assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, FILENAME))); +assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"))); + assertTrue(alterOffsets.apply(Collections.singletonMap("garbage_partition_key", "garbage_partition_value"))); +} }
[kafka] branch trunk updated (d85a7008133 -> a253dc6643b)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from d85a7008133 MINOR: Do not reuse admin client across tests (#14225) add a253dc6643b KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2 (#14220) No new revisions were added by this update. Summary of changes: docs/upgrade.html | 4 1 file changed, 4 insertions(+)
[kafka] branch trunk updated: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors (#14005)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new de409b389d2 KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors (#14005) de409b389d2 is described below commit de409b389d26f7681fba8583db2b96584258aa48 Author: Chris Egerton AuthorDate: Thu Aug 17 09:33:59 2023 -0400 KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors (#14005) Reviewers: Yash Mayya , Greg Harris --- .../connect/mirror/MirrorCheckpointConnector.java | 31 .../connect/mirror/MirrorHeartbeatConnector.java | 30 .../connect/mirror/MirrorSourceConnector.java | 29 .../apache/kafka/connect/mirror/MirrorUtils.java | 173 +++-- .../mirror/MirrorCheckpointConnectorTest.java | 145 + .../mirror/MirrorHeartBeatConnectorTest.java | 138 .../connect/mirror/MirrorSourceConnectorTest.java | 141 + .../MirrorConnectorsIntegrationBaseTest.java | 102 +++- ...MirrorConnectorsIntegrationExactlyOnceTest.java | 52 +++ .../integration/ConnectWorkerIntegrationTest.java | 11 ++ .../clusters/EmbeddedConnectClusterAssertions.java | 5 - .../util/clusters/EmbeddedKafkaCluster.java| 9 +- 12 files changed, 844 insertions(+), 22 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 07e7b49a44e..1a146dcf5ac 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; import org.slf4j.Logger; @@ -40,6 +41,9 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY; +import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY; + /** Replicate consumer group state between clusters. Emits checkpoint records. * * @see MirrorCheckpointConfig for supported config properties. @@ -132,6 +136,33 @@ public class MirrorCheckpointConnector extends SourceConnector { return AppInfoParser.getVersion(); } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +for (Map.Entry, Map> offsetEntry : offsets.entrySet()) { +Map sourceOffset = offsetEntry.getValue(); +if (sourceOffset == null) { +// We allow tombstones for anything; if there's garbage in the offsets for the connector, we don't +// want to prevent users from being able to clean it up using the REST API +continue; +} + +Map sourcePartition = offsetEntry.getKey(); +if (sourcePartition == null) { +throw new ConnectException("Source partitions may not be null"); +} + +MirrorUtils.validateSourcePartitionString(sourcePartition, CONSUMER_GROUP_ID_KEY); +MirrorUtils.validateSourcePartitionString(sourcePartition, TOPIC_KEY); +MirrorUtils.validateSourcePartitionPartition(sourcePartition); + +MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset, true); +} + +// We don't actually use these offsets in the task class, so no additional effort is required beyond just validating +// the format of the user-supplied offsets +return true; +} + private void refreshConsumerGroups() throws InterruptedException, ExecutionException { Set consumerGroups = findConsumerGroups(); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java index 6410e8fc3f9..6ab9fce31be 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.connec
[kafka] branch 3.1 updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new f9fde0eec16 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) f9fde0eec16 is described below commit f9fde0eec161cd51f5675a1dcb51972fd3bd9ca5 Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 +++-- .../kafka/connect/mirror/MirrorClientConfig.java | 16 ++- .../connect/mirror/ReplicationPolicyTest.java | 52 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 4 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index 4305366f6fa..af5a9e684b8 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -52,7 +52,15 @@ public class MirrorClientConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; - +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of
[kafka] branch 3.2 updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new 01f56ec7df7 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) 01f56ec7df7 is described below commit 01f56ec7df7032362c36e76847fe0637052ed2b9 Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 +++-- .../kafka/connect/mirror/MirrorClientConfig.java | 16 ++- .../connect/mirror/ReplicationPolicyTest.java | 52 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 4 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index 4305366f6fa..af5a9e684b8 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -52,7 +52,15 @@ public class MirrorClientConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; - +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of
[kafka] branch 3.3 updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 2c71f53d732 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) 2c71f53d732 is described below commit 2c71f53d732700da1da7e48734b9c996503d45ee Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 +++-- .../kafka/connect/mirror/MirrorClientConfig.java | 16 ++- .../connect/mirror/ReplicationPolicyTest.java | 52 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 4 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index e3a1fec2e7d..928e8b049ab 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -56,7 +56,15 @@ public class MirrorClientConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; - +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of
[kafka] branch 3.4 updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new e173a826bc4 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) e173a826bc4 is described below commit e173a826bc40d8e95b2865149eaae119966a1dba Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 ++-- .../kafka/connect/mirror/MirrorClientConfig.java | 14 ++ .../connect/mirror/ReplicationPolicyTest.java | 53 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index bf1bd4826e1..477459895c5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -59,6 +59,14 @@ public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. " + +"By default, custom separators are used in these topic name
[kafka] branch 3.5 updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new abd1c8e46fc KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) abd1c8e46fc is described below commit abd1c8e46fc413f8cb8d2d07a80ee74eaf5d9708 Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 ++-- .../kafka/connect/mirror/MirrorClientConfig.java | 14 ++ .../connect/mirror/ReplicationPolicyTest.java | 53 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index bf1bd4826e1..477459895c5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -59,6 +59,14 @@ public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. " + +"By default, custom separators are used in these topic name
[kafka] branch trunk updated: KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 35e925f3535 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) 35e925f3535 is described below commit 35e925f3535e7774520317310505fcde946228d5 Author: Omnia G.H Ibrahim AuthorDate: Wed Aug 16 00:58:52 2023 +0100 KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082) Reviewers: Chris Egerton --- .../connect/mirror/DefaultReplicationPolicy.java | 20 ++-- .../kafka/connect/mirror/MirrorClientConfig.java | 14 ++ .../connect/mirror/ReplicationPolicyTest.java | 53 ++ .../connect/mirror/MirrorConnectorConfig.java | 11 + 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 9de50b603d7..65a98947d39 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR; public static final String SEPARATOR_DEFAULT = "."; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED; +public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true; + private String separator = SEPARATOR_DEFAULT; private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT)); +private boolean isInternalTopicSeparatorEnabled = true; @Override public void configure(Map props) { @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable separator = (String) props.get(SEPARATOR_CONFIG); log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString()); +if (!isInternalTopicSeparatorEnabled) { +log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator); +} +} } } @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable } } +private String internalSeparator() { +return isInternalTopicSeparatorEnabled ? separator : "."; +} private String internalSuffix() { -return separator + "internal"; +return internalSeparator() + "internal"; } private String checkpointsTopicSuffix() { -return separator + "checkpoints" + internalSuffix(); +return internalSeparator() + "checkpoints" + internalSuffix(); } @Override public String offsetSyncsTopic(String clusterAlias) { -return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix(); +return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix(); } @Override diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index bf1bd4826e1..477459895c5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -59,6 +59,14 @@ public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled"; +public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = +"Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. " + +"By default, custom separators are used in these topic
[kafka] branch trunk updated: KAFKA-14682: Report Mockito unused stubbings during Jenkins build (#14186)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d91c9bd2b59 KAFKA-14682: Report Mockito unused stubbings during Jenkins build (#14186) d91c9bd2b59 is described below commit d91c9bd2b594719cee629b6c057204fb0de0d1a2 Author: Chris Egerton AuthorDate: Sat Aug 12 16:52:49 2023 -0400 KAFKA-14682: Report Mockito unused stubbings during Jenkins build (#14186) * KAFKA-14682: Report Mockito unused stubbings during Jenkins build * DO NOT MERGE: Add test case that should fail during Jenkins build * Revert "DO NOT MERGE: Add test case that should fail during Jenkins build" This reverts commit 8418b835ecb49fa10da04c7a997c7e982a8c4a47. --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 70f956bb4fc..d5d8909047f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -29,7 +29,7 @@ def isChangeRequest(env) { env.CHANGE_ID != null && !env.CHANGE_ID.isEmpty() } -def doTest(env, target = "unitTest integrationTest") { +def doTest(env, target = "test") { sh """./gradlew -PscalaVersion=$SCALA_VERSION ${target} \ --profile --continue -PkeepAliveMode="session" -PtestLoggingEvents=started,passed,skipped,failed \ -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=10"""
[kafka] branch trunk updated: KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b9936d6292f KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143) b9936d6292f is described below commit b9936d6292f3d7e76260b91e96520f94d5bc9bd7 Author: Yash Mayya AuthorDate: Thu Aug 3 18:07:35 2023 +0100 KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143) Reviewers: Chris Egerton --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java | 2 +- .../java/org/apache/kafka/connect/util/RetryUtilTest.java| 12 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 174f9be0ab8..8babe4ebfc8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -80,7 +80,7 @@ public class RetryUtil { final long end = time.milliseconds() + timeoutMs; int attempt = 0; -Throwable lastError = null; +Throwable lastError; do { attempt++; try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 08e2157c7be..b8884b5ada8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.connect.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; - import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.MockTime; @@ -29,13 +25,17 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.junit.MockitoJUnitRunner; import java.time.Duration; import java.util.concurrent.Callable; import java.util.function.Supplier; -@RunWith(PowerMockRunner.class) +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class RetryUtilTest { private final Time mockTime = new MockTime(10);
[kafka] branch 3.5 updated: KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new 3f7c0c83d6f KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) 3f7c0c83d6f is described below commit 3f7c0c83d6f761a49d12d182beec9c416ac96012 Author: Yash Mayya AuthorDate: Tue Jul 25 14:03:29 2023 +0100 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) Reviewers: Chris Egerton --- .../connect/runtime/AbstractWorkerSourceTask.java | 8 ++- .../runtime/ExactlyOnceWorkerSourceTask.java | 8 ++- .../org/apache/kafka/connect/runtime/Worker.java | 11 ++-- .../kafka/connect/runtime/WorkerSinkTask.java | 12 +++- .../kafka/connect/runtime/WorkerSourceTask.java| 8 ++- .../runtime/AbstractWorkerSourceTaskTest.java | 38 +++-- .../connect/runtime/ErrorHandlingTaskTest.java | 58 +-- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 2 +- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 66 +++--- .../runtime/WorkerSinkTaskThreadedTest.java| 3 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- 11 files changed, 155 insertions(+), 61 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 6d5446d9c21..4f9e0936ee0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -37,6 +37,7 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; @@ -65,6 +66,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -195,6 +197,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { private final boolean topicTrackingEnabled; private final TopicCreation topicCreation; private final Executor closeExecutor; +private final Supplier> errorReportersSupplier; // Visible for testing List toSend; @@ -224,7 +227,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, - Executor closeExecutor) { + Executor closeExecutor, + Supplier> errorReportersSupplier) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, time, statusBackingStore); @@ -242,6 +246,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks"); this.closeExecutor = closeExecutor; this.sourceTaskContext = sourceTaskContext; +this.errorReportersSupplier = errorReportersSupplier; this.stopRequestedLatch = new CountDownLatch(1); this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); @@ -261,6 +266,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { @Override protected void initializeAndStart() { +retryWithToleranceOperator.reporters(errorReportersSupplier.get()); prepareToInitializeTask(); offsetStore.start(); // If we try to start the task at all by invoking initialize, then count this as diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 30dafaac81d..da58b4fafc9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/
[kafka] branch trunk updated: KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 08b3820d5e3 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) 08b3820d5e3 is described below commit 08b3820d5e3994e137fa5ab917adc37a30910144 Author: Yash Mayya AuthorDate: Tue Jul 25 14:03:29 2023 +0100 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) Reviewers: Chris Egerton --- .../connect/runtime/AbstractWorkerSourceTask.java | 8 ++- .../runtime/ExactlyOnceWorkerSourceTask.java | 8 ++- .../org/apache/kafka/connect/runtime/Worker.java | 11 ++-- .../kafka/connect/runtime/WorkerSinkTask.java | 12 +++-- .../kafka/connect/runtime/WorkerSourceTask.java| 8 ++- .../runtime/AbstractWorkerSourceTaskTest.java | 38 -- .../connect/runtime/ErrorHandlingTaskTest.java | 58 +++--- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 2 +- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 56 +++-- .../runtime/WorkerSinkTaskThreadedTest.java| 3 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- 11 files changed, 150 insertions(+), 56 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 6d5446d9c21..4f9e0936ee0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -37,6 +37,7 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; @@ -65,6 +66,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -195,6 +197,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { private final boolean topicTrackingEnabled; private final TopicCreation topicCreation; private final Executor closeExecutor; +private final Supplier> errorReportersSupplier; // Visible for testing List toSend; @@ -224,7 +227,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, - Executor closeExecutor) { + Executor closeExecutor, + Supplier> errorReportersSupplier) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, time, statusBackingStore); @@ -242,6 +246,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks"); this.closeExecutor = closeExecutor; this.sourceTaskContext = sourceTaskContext; +this.errorReportersSupplier = errorReportersSupplier; this.stopRequestedLatch = new CountDownLatch(1); this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); @@ -261,6 +266,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { @Override protected void initializeAndStart() { +retryWithToleranceOperator.reporters(errorReportersSupplier.get()); prepareToInitializeTask(); offsetStore.start(); // If we try to start the task at all by invoking initialize, then count this as diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index f8f0d9f393c..5a123f2cd1e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main
[kafka] branch C0urante-patch-1 deleted (was b3864a1fa56)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch C0urante-patch-1 in repository https://gitbox.apache.org/repos/asf/kafka.git was b3864a1fa56 MINOR: Downgrade log level for conflicting Connect plugin aliases The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch trunk updated: MINOR: Downgrade log level for conflicting Connect plugin aliases (#14081)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 58b8c5c7b1c MINOR: Downgrade log level for conflicting Connect plugin aliases (#14081) 58b8c5c7b1c is described below commit 58b8c5c7b1c7d28073bdef4f13fee16e5122a6d2 Author: Chris Egerton AuthorDate: Tue Jul 25 05:12:46 2023 -0700 MINOR: Downgrade log level for conflicting Connect plugin aliases (#14081) Reviewers: Greg Harris --- .../java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index e88bfa1b03d..88b1fc0484a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -408,7 +408,7 @@ public class PluginUtils { if (classNames.size() == 1) { aliases.put(alias, classNames.stream().findAny().get()); } else { -log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames); +log.debug("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames); } } return aliases;