This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new e9b72b1 KAFKA-9747: Creating connect reconfiguration URL safely
(#11174)
e9b72b1 is described below
commit e9b72b13c8a9b4cb8eaa2a9b0a9207acb40edfc9
Author: Andras Katona <[email protected]>
AuthorDate: Thu Sep 2 10:09:55 2021 +0200
KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
* URL wasn't urlencoded when forwarded reconfiguration to leader connect
worker
* handling previously swallowed errors in connect RestClient
Reviewers: Mickael Maison <[email protected]>, Viktor Somogyi-Vass
<[email protected]>
Co-authored-by: Andras Katona <[email protected]>
Co-authored-by: Daniel Urban <[email protected]>
---
.../kafka/connect/runtime/distributed/DistributedHerder.java | 9 +++++++--
.../java/org/apache/kafka/connect/runtime/rest/RestClient.java | 3 +++
.../java/org/apache/kafka/connect/runtime/rest/RestServer.java | 7 -------
3 files changed, 10 insertions(+), 9 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cb2c4da..0c668a0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -50,7 +50,6 @@ import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -66,6 +65,7 @@ import org.slf4j.Logger;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1463,7 +1463,12 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
"because the URL of the leader's REST
interface is empty!"), null);
return;
}
- String reconfigUrl = RestServer.urlJoin(leaderUrl,
"/connectors/" + connName + "/tasks");
+ String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+ .path("connectors")
+ .path(connName)
+ .path("tasks")
+ .build()
+ .toString();
log.trace("Forwarding task configurations for
connector {} to leader", connName);
RestClient.httpRequest(reconfigUrl, "POST", null,
rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 58d7df0..81c5a84 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -142,6 +142,9 @@ public class RestClient {
} catch (IOException | InterruptedException | TimeoutException |
ExecutionException e) {
log.error("IO error forwarding REST request: ", e);
throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to
forward REST request: " + e.getMessage(), e);
+ } catch (Throwable t) {
+ log.error("Error forwarding REST request", t);
+ throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to
forward REST request: " + t.getMessage(), t);
} finally {
try {
client.stop();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 136c616..4be40c1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -471,13 +471,6 @@ public class RestServer {
}
- public static String urlJoin(String base, String path) {
- if (base.endsWith("/") && path.startsWith("/"))
- return base + path.substring(1);
- else
- return base + path;
- }
-
/**
* Register header filter to ServletContextHandler.
* @param context The serverlet context handler