This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3859e1d add time for rest service request and some minor optimize
(#112)
3859e1d is described below
commit 3859e1db11da09b1e2186c8c4c90453e22fadb45
Author: legendtkl <[email protected]>
AuthorDate: Thu Feb 23 10:39:39 2023 +0800
add time for rest service request and some minor optimize (#112)
---
.../src/main/java/org/apache/doris/flink/rest/RestService.java | 10 +++++++---
.../org/apache/doris/flink/table/DorisDynamicTableSink.java | 2 +-
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index ac32867..c774d68 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -121,7 +121,7 @@ public class RestService implements Serializable {
try {
String response;
if (request instanceof HttpGet) {
- response = getConnectionGet(request.getURI().toString(),
options.getUsername(), options.getPassword(), logger);
+ response = getConnectionGet(request,
options.getUsername(), options.getPassword(), logger);
} else {
response = getConnectionPost(request,
options.getUsername(), options.getPassword(), logger);
}
@@ -162,6 +162,8 @@ public class RestService implements Serializable {
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
+ conn.setConnectTimeout(request.getConfig().getConnectTimeout());
+ conn.setReadTimeout(request.getConfig().getSocketTimeout());
PrintWriter out = new PrintWriter(conn.getOutputStream());
// send request params
out.print(res);
@@ -171,14 +173,16 @@ public class RestService implements Serializable {
return parseResponse(conn, logger);
}
- private static String getConnectionGet(String request, String user, String
passwd, Logger logger) throws IOException {
- URL realUrl = new URL(request);
+ private static String getConnectionGet(HttpRequestBase request, String
user, String passwd, Logger logger) throws IOException {
+ URL realUrl = new URL(request.getURI().toString());
// open connection
HttpURLConnection connection = (HttpURLConnection)
realUrl.openConnection();
String authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " +
authEncoding);
connection.connect();
+ connection.setConnectTimeout(request.getConfig().getConnectTimeout());
+ connection.setReadTimeout(request.getConfig().getSocketTimeout());
return parseResponse(connection, logger);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 2bba98b..ae4e137 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -80,7 +80,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
- boolean deletable = RestService.isUniqueKeyType(options, readOptions,
LOG) && executionOptions.getDeletable();
+ boolean deletable = executionOptions.getDeletable() &&
RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length >
0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]