This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03082bc5eca [improve][io] Add PUT method support to HttpSink (#25133)
03082bc5eca is described below
commit 03082bc5eca615d29595c0ebc80bfc4bb6c89423
Author: Dream95 <[email protected]>
AuthorDate: Wed Jan 14 21:55:20 2026 +0800
[improve][io] Add PUT method support to HttpSink (#25133)
Signed-off-by: Dream95 <[email protected]>
---
.../main/java/org/apache/pulsar/io/http/HttpSink.java | 2 +-
.../org/apache/pulsar/io/http/HttpSinkConfig.java | 8 ++++++++
.../java/org/apache/pulsar/io/http/HttpSinkTest.java | 19 ++++++++++++++++++-
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
index 31b5053ba7a..c59fcaadfb1 100644
--- a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
@@ -70,7 +70,7 @@ public class HttpSink implements Sink<GenericObject> {
byte[] bytes = mapper.writeValueAsBytes(json);
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri)
- .POST(HttpRequest.BodyPublishers.ofByteArray(bytes));
+ .method(httpSinkConfig.getHttpMethod().name(),
HttpRequest.BodyPublishers.ofByteArray(bytes));
httpSinkConfig.getHeaders().forEach(builder::header);
record.getProperties().forEach((k, v) ->
builder.header("PulsarProperties-" + k, v));
record.getTopicName().ifPresent(topic -> builder.header("PulsarTopic",
topic));
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
index 2d51e7412fc..a6dc6051d34 100644
--- a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
@@ -46,6 +46,14 @@ public class HttpSinkConfig implements Serializable {
help = "The list of default headers added to each request")
private Map<String, String> headers = new HashMap<>();
+ @FieldDoc(defaultValue = "POST",
+ help = "The HTTP method to use in the request,support POST/PUT")
+ private HttpMethod httpMethod = HttpMethod.POST;
+
+ public enum HttpMethod {
+ POST,
+ PUT
+ }
public static HttpSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), HttpSinkConfig.class);
diff --git
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
index 6ad8f45e731..7f09030b2f9 100644
--- a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -24,11 +24,14 @@ import static
com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.put;
+import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
import java.io.IOException;
import java.sql.Time;
import java.sql.Timestamp;
@@ -70,6 +73,8 @@ public class HttpSinkTest {
configureFor(server.port());
stubFor(post(urlPathEqualTo("/"))
.willReturn(aResponse().withStatus(200)));
+ stubFor(put(urlPathEqualTo("/"))
+ .willReturn(aResponse().withStatus(200)));
}
@AfterClass
@@ -233,12 +238,19 @@ public class HttpSinkTest {
}
private void test(Schema<?> schema, GenericObject genericObject, String
responseBody) throws Exception {
+ test(HttpSinkConfig.HttpMethod.PUT.name(), schema, genericObject,
responseBody);
+ test(HttpSinkConfig.HttpMethod.POST.name(), schema, genericObject,
responseBody);
+ }
+
+ private void test(String httpMethod, Schema<?> schema,
+ GenericObject genericObject, String responseBody) throws
Exception {
HttpSink httpSink = new HttpSink();
Map<String, Object> config = new HashMap<>();
config.put("url", server.baseUrl());
Map<String, String> headers = new HashMap<>();
headers.put("header-name", "header-value");
config.put("headers", headers);
+ config.put("httpMethod", httpMethod);
httpSink.open(config, null);
long now = 1662418008000L;
@@ -428,8 +440,13 @@ public class HttpSinkTest {
}
};
httpSink.write(record);
+ RequestPatternBuilder requestPatternBuilder = switch (httpMethod) {
+ case "POST" -> postRequestedFor(urlEqualTo("/"));
+ case "PUT" -> putRequestedFor(urlEqualTo("/"));
+ default -> throw new IllegalArgumentException("UnSupport
httpMethod: " + httpMethod);
+ };
- verify(postRequestedFor(urlEqualTo("/"))
+ verify(requestPatternBuilder
.withRequestBody(equalToJson(responseBody))
.withHeader("Content-Type", equalTo("application/json"))
.withHeader("header-name", equalTo("header-value"))