This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3859e1d  add time for rest service request and some minor optimize 
(#112)
3859e1d is described below

commit 3859e1db11da09b1e2186c8c4c90453e22fadb45
Author: legendtkl <[email protected]>
AuthorDate: Thu Feb 23 10:39:39 2023 +0800

    add time for rest service request and some minor optimize (#112)
---
 .../src/main/java/org/apache/doris/flink/rest/RestService.java | 10 +++++++---
 .../org/apache/doris/flink/table/DorisDynamicTableSink.java    |  2 +-
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index ac32867..c774d68 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -121,7 +121,7 @@ public class RestService implements Serializable {
             try {
                 String response;
                 if (request instanceof HttpGet) {
-                    response = getConnectionGet(request.getURI().toString(), 
options.getUsername(), options.getPassword(), logger);
+                    response = getConnectionGet(request, 
options.getUsername(), options.getPassword(), logger);
                 } else {
                     response = getConnectionPost(request, 
options.getUsername(), options.getPassword(), logger);
                 }
@@ -162,6 +162,8 @@ public class RestService implements Serializable {
         String res = IOUtils.toString(content);
         conn.setDoOutput(true);
         conn.setDoInput(true);
+        conn.setConnectTimeout(request.getConfig().getConnectTimeout());
+        conn.setReadTimeout(request.getConfig().getSocketTimeout());
         PrintWriter out = new PrintWriter(conn.getOutputStream());
         // send request params
         out.print(res);
@@ -171,14 +173,16 @@ public class RestService implements Serializable {
         return parseResponse(conn, logger);
     }
 
-    private static String getConnectionGet(String request, String user, String 
passwd, Logger logger) throws IOException {
-        URL realUrl = new URL(request);
+    private static String getConnectionGet(HttpRequestBase request, String 
user, String passwd, Logger logger) throws IOException {
+        URL realUrl = new URL(request.getURI().toString());
         // open connection
         HttpURLConnection connection = (HttpURLConnection) 
realUrl.openConnection();
         String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
         connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
 
         connection.connect();
+        connection.setConnectTimeout(request.getConfig().getConnectTimeout());
+        connection.setReadTimeout(request.getConfig().getSocketTimeout());
         return parseResponse(connection, logger);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 2bba98b..ae4e137 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -80,7 +80,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         Properties loadProperties = executionOptions.getStreamLoadProp();
-        boolean deletable = RestService.isUniqueKeyType(options, readOptions, 
LOG) && executionOptions.getDeletable();
+        boolean deletable = executionOptions.getDeletable() && 
RestService.isUniqueKeyType(options, readOptions, LOG);
         if (!loadProperties.containsKey(COLUMNS_KEY)) {
             String[] fieldNames = tableSchema.getFieldNames();
             Preconditions.checkState(fieldNames != null && fieldNames.length > 
0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to