This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d24eb89a4 fix: respect tls spec for segment push client (#18530)
e4d24eb89a4 is described below

commit e4d24eb89a4a16f997f611154b56210c75f97a33
Author: Cc <[email protected]>
AuthorDate: Thu May 28 14:32:02 2026 +0800

    fix: respect tls spec for segment push client (#18530)
    
    Segment push used the shared default FileUploadDownloadClient, so job-level 
TlsSpec key/trust store and timeout settings were ignored for controller upload 
calls.
    
    Create TLS-aware clients for segment push and consistent-data push 
controller calls, avoid auto-renewal resources for short-lived clients, and 
cover the public push paths with local mTLS tests.
    
    Co-authored-by: wolfkill <[email protected]>
---
 .../common/utils/FileUploadDownloadClient.java     |  39 +-
 .../apache/pinot/common/utils/tls/TlsUtils.java    |  66 +++-
 .../local/utils/ConsistentDataPushUtils.java       | 178 +++++----
 .../segment/local/utils/SegmentPushUtils.java      | 404 ++++++++++++---------
 .../segment/local/utils/SegmentPushUtilsTest.java  | 300 +++++++++++++++
 5 files changed, 721 insertions(+), 266 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 259652ad7c0..6acd94f097b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -894,8 +894,17 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
   public Map<String, List<String>> getSegments(URI controllerBaseUri, String 
rawTableName,
       @Nullable TableType tableType, boolean excludeReplacedSegments, 
@Nullable AuthProvider authProvider)
       throws Exception {
+    return getSegments(controllerBaseUri, rawTableName, tableType, 
excludeReplacedSegments, authProvider,
+        HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+  }
+
+  /// Returns table segments using an explicit socket timeout for the 
controller request.
+  public Map<String, List<String>> getSegments(URI controllerBaseUri, String 
rawTableName,
+      @Nullable TableType tableType, boolean excludeReplacedSegments, 
@Nullable AuthProvider authProvider,
+      int socketTimeoutMs)
+      throws Exception {
     return getSegments(controllerBaseUri, rawTableName, tableType, 
excludeReplacedSegments, Long.MIN_VALUE,
-        Long.MAX_VALUE, false, authProvider);
+        Long.MAX_VALUE, false, authProvider, socketTimeoutMs);
   }
 
   /**
@@ -916,6 +925,15 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
       @Nullable TableType tableType, boolean excludeReplacedSegments, long 
startTimestamp, long endTimestamp,
       boolean excludeOverlapping, @Nullable AuthProvider authProvider)
       throws Exception {
+    return getSegments(controllerBaseUri, rawTableName, tableType, 
excludeReplacedSegments, startTimestamp,
+        endTimestamp, excludeOverlapping, authProvider, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+  }
+
+  /// Returns table segments using an explicit socket timeout for the 
controller request.
+  public Map<String, List<String>> getSegments(URI controllerBaseUri, String 
rawTableName,
+      @Nullable TableType tableType, boolean excludeReplacedSegments, long 
startTimestamp, long endTimestamp,
+      boolean excludeOverlapping, @Nullable AuthProvider authProvider, int 
socketTimeoutMs)
+      throws Exception {
     List<String> tableTypes;
     if (tableType == null) {
       tableTypes = Arrays.asList(TableType.OFFLINE.toString(), 
TableType.REALTIME.toString());
@@ -936,7 +954,7 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
       RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() 
-> {
         try {
           SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
-              _httpClient.sendRequest(requestBuilder.build(), 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
+              _httpClient.sendRequest(requestBuilder.build(), 
socketTimeoutMs));
           LOGGER.info("Response {}: {} received for GET request to URI: {}", 
response.getStatusCode(),
               response.getResponse(), uri);
           tableTypeToSegments.put(tableTypeToFilter,
@@ -1150,9 +1168,16 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
   public SimpleHttpResponse startReplaceSegments(URI uri, 
StartReplaceSegmentsRequest startReplaceSegmentsRequest,
       @Nullable AuthProvider authProvider)
       throws IOException, HttpErrorStatusException {
+    return startReplaceSegments(uri, startReplaceSegmentsRequest, 
authProvider, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+  }
+
+  /// Starts the consistent-push replace protocol using an explicit socket 
timeout.
+  public SimpleHttpResponse startReplaceSegments(URI uri, 
StartReplaceSegmentsRequest startReplaceSegmentsRequest,
+      @Nullable AuthProvider authProvider, int socketTimeoutMs)
+      throws IOException, HttpErrorStatusException {
     return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(
         getStartReplaceSegmentsRequest(uri, 
JsonUtils.objectToString(startReplaceSegmentsRequest), authProvider),
-        HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
+        socketTimeoutMs));
   }
 
   /**
@@ -1197,8 +1222,14 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
    */
   public SimpleHttpResponse revertReplaceSegments(URI uri, @Nullable 
AuthProvider authProvider)
       throws IOException, HttpErrorStatusException {
+    return revertReplaceSegments(uri, authProvider, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+  }
+
+  /// Reverts the consistent-push replace protocol using an explicit socket 
timeout.
+  public SimpleHttpResponse revertReplaceSegments(URI uri, @Nullable 
AuthProvider authProvider, int socketTimeoutMs)
+      throws IOException, HttpErrorStatusException {
     return HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(
-        getRevertReplaceSegmentRequest(uri, authProvider)));
+        getRevertReplaceSegmentRequest(uri, authProvider), socketTimeoutMs));
   }
 
   /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
index 74cf0023dd1..ec25628f51c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/tls/TlsUtils.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
@@ -250,30 +251,59 @@ public final class TlsUtils {
    */
   public static void installDefaultSSLSocketFactory(String keyStoreType, 
String keyStorePath, String keyStorePassword,
       String trustStoreType, String trustStorePath, String trustStorePassword) 
{
+    SSLContext sc = createSslContext(keyStoreType, keyStorePath, 
keyStorePassword, trustStoreType, trustStorePath,
+        trustStorePassword);
+    // HttpsURLConnection
+    HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+    setSslContext(sc);
+    logTlsDiagnosticsOnce("https.default", sc, null, false);
+  }
+
+  /// Creates a client-side SSL context from key/trust store settings without 
mutating JVM defaults.
+  ///
+  /// If both store paths are null, the returned context uses default key and 
trust managers. If either store path is
+  /// provided, its type and password must also be provided. File-backed 
stores are auto-renewed.
+  public static SSLContext createSslContext(@Nullable String keyStoreType, 
@Nullable String keyStorePath,
+      @Nullable String keyStorePassword, @Nullable String trustStoreType, 
@Nullable String trustStorePath,
+      @Nullable String trustStorePassword) {
+    return createSslContext(keyStoreType, keyStorePath, keyStorePassword, 
trustStoreType, trustStorePath,
+        trustStorePassword, true);
+  }
+
+  /// Creates a client-side SSL context without enabling file-store 
auto-renewal.
+  ///
+  /// This is intended for short-lived clients whose owners can close the HTTP 
client but do not own the renewal
+  /// executors/watch services created by auto-renewal.
+  public static SSLContext createSslContextWithoutAutoRenewal(@Nullable String 
keyStoreType,
+      @Nullable String keyStorePath, @Nullable String keyStorePassword, 
@Nullable String trustStoreType,
+      @Nullable String trustStorePath, @Nullable String trustStorePassword) {
+    return createSslContext(keyStoreType, keyStorePath, keyStorePassword, 
trustStoreType, trustStorePath,
+        trustStorePassword, false);
+  }
+
+  private static SSLContext createSslContext(@Nullable String keyStoreType, 
@Nullable String keyStorePath,
+      @Nullable String keyStorePassword, @Nullable String trustStoreType, 
@Nullable String trustStorePath,
+      @Nullable String trustStorePassword, boolean enableAutoRenewal) {
     try {
       SecureRandom secureRandom = new SecureRandom();
-      SSLContext sc;
       if (keyStorePath == null && trustStorePath == null) {
         // When neither keyStorePath nor trustStorePath is provided, a 
SSLFactory cannot be created. create SSLContext
         // directly and use the default key manager and trust manager.
-        sc = SSLContext.getInstance(SSL_CONTEXT_PROTOCOL);
-        sc.init(null, null, secureRandom);
-      } else {
-        SSLFactory sslFactory =
-            RenewableTlsUtils.createSSLFactory(keyStoreType, keyStorePath, 
keyStorePassword, trustStoreType,
-                trustStorePath, trustStorePassword, SSL_CONTEXT_PROTOCOL, 
secureRandom, true, false);
-        if (isKeyOrTrustStorePathNullOrHasFileScheme(keyStorePath) && 
isKeyOrTrustStorePathNullOrHasFileScheme(
-            trustStorePath)) {
-          
RenewableTlsUtils.enableAutoRenewalFromFileStoreForSSLFactory(sslFactory, 
keyStoreType, keyStorePath,
-              keyStorePassword, trustStoreType, trustStorePath, 
trustStorePassword, SSL_CONTEXT_PROTOCOL, secureRandom,
-              PinotInsecureMode::isPinotInInsecureMode);
-        }
-        sc = sslFactory.getSslContext();
+        SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_PROTOCOL);
+        sslContext.init(null, null, secureRandom);
+        return sslContext;
+      }
+
+      SSLFactory sslFactory =
+          RenewableTlsUtils.createSSLFactory(keyStoreType, keyStorePath, 
keyStorePassword, trustStoreType,
+              trustStorePath, trustStorePassword, SSL_CONTEXT_PROTOCOL, 
secureRandom, true, false);
+      if (enableAutoRenewal && 
isKeyOrTrustStorePathNullOrHasFileScheme(keyStorePath)
+          && isKeyOrTrustStorePathNullOrHasFileScheme(trustStorePath)) {
+        
RenewableTlsUtils.enableAutoRenewalFromFileStoreForSSLFactory(sslFactory, 
keyStoreType, keyStorePath,
+            keyStorePassword, trustStoreType, trustStorePath, 
trustStorePassword, SSL_CONTEXT_PROTOCOL, secureRandom,
+            PinotInsecureMode::isPinotInInsecureMode);
       }
-      // HttpsURLConnection
-      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
-      setSslContext(sc);
-      logTlsDiagnosticsOnce("https.default", sc, null, false);
+      return sslFactory.getSslContext();
     } catch (GenericSSLContextException | GeneralSecurityException e) {
       throw new IllegalStateException("Could not initialize SSL support", e);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
index 8d1cc9bde58..23ad163c294 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
@@ -31,7 +31,6 @@ import 
org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -51,7 +50,6 @@ public class ConsistentDataPushUtils {
   }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsistentDataPushUtils.class);
-  private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0);
   public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
 
@@ -113,42 +111,48 @@ public class ConsistentDataPushUtils {
     Map<URI, URI> segmentsUris = getStartReplaceSegmentUris(spec, 
rawTableName);
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
     LOGGER.info("Start replace segment URIs: {}", segmentsUris);
+    FileUploadDownloadClient fileUploadDownloadClient = 
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
 
-    for (Map.Entry<URI, URI> entry : segmentsUris.entrySet()) {
-      URI controllerUri = entry.getKey();
-      URI startSegmentUri = entry.getValue();
-      List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
+    try {
+      for (Map.Entry<URI, URI> entry : segmentsUris.entrySet()) {
+        URI controllerUri = entry.getKey();
+        URI startSegmentUri = entry.getValue();
+        List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
 
-      StartReplaceSegmentsRequest startReplaceSegmentsRequest =
-          new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
-      DEFAULT_RETRY_POLICY.attempt(() -> {
-        try {
-          SimpleHttpResponse response =
-              
FILE_UPLOAD_DOWNLOAD_CLIENT.startReplaceSegments(startSegmentUri, 
startReplaceSegmentsRequest,
-                  authProvider);
-          String responseString = response.getResponse();
-          LOGGER.info(
-              "Got response {}: {} while sending start replace segment request 
for table: {}, uploadURI: {}, request:"
-                  + " {}", response.getStatusCode(), responseString, 
rawTableName, startSegmentUri,
-              startReplaceSegmentsRequest);
-          String segmentLineageEntryId =
-              
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
-          uriToLineageEntryIdMap.put(controllerUri, segmentLineageEntryId);
-          return true;
-        } catch (SocketTimeoutException se) {
-          // In case of the timeout, we should re-try.
-          return false;
-        } catch (HttpErrorStatusException e) {
-          if (e.getStatusCode() >= 500) {
+        StartReplaceSegmentsRequest startReplaceSegmentsRequest =
+            new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
+        DEFAULT_RETRY_POLICY.attempt(() -> {
+          try {
+            SimpleHttpResponse response =
+                fileUploadDownloadClient.startReplaceSegments(startSegmentUri, 
startReplaceSegmentsRequest,
+                    authProvider, socketTimeoutMs);
+            String responseString = response.getResponse();
+            LOGGER.info(
+                "Got response {}: {} while sending start replace segment 
request for table: {}, uploadURI: {}, request:"
+                    + " {}", response.getStatusCode(), responseString, 
rawTableName, startSegmentUri,
+                startReplaceSegmentsRequest);
+            String segmentLineageEntryId =
+                
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
+            uriToLineageEntryIdMap.put(controllerUri, segmentLineageEntryId);
+            return true;
+          } catch (SocketTimeoutException se) {
+            // In case of the timeout, we should re-try.
             return false;
-          } else {
-            if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-              LOGGER.error("Table: {} not found when sending request: {}", 
rawTableName, startSegmentUri);
+          } catch (HttpErrorStatusException e) {
+            if (e.getStatusCode() >= 500) {
+              return false;
+            } else {
+              if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
+                LOGGER.error("Table: {} not found when sending request: {}", 
rawTableName, startSegmentUri);
+              }
+              throw e;
             }
-            throw e;
           }
-        }
-      });
+        });
+      }
+    } finally {
+      SegmentPushUtils.closeFileUploadDownloadClient(spec, 
fileUploadDownloadClient);
     }
     return uriToLineageEntryIdMap;
   }
@@ -160,30 +164,35 @@ public class ConsistentDataPushUtils {
       throws Exception {
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
     String rawTableName = spec.getTableSpec().getTableName();
-    for (URI controllerUri : uriToLineageEntryIdMap.keySet()) {
-      String segmentLineageEntryId = uriToLineageEntryIdMap.get(controllerUri);
-      URI uri =
-          FileUploadDownloadClient.getEndReplaceSegmentsURI(controllerUri, 
rawTableName, TableType.OFFLINE.toString(),
-              segmentLineageEntryId, false);
-      DEFAULT_RETRY_POLICY.attempt(() -> {
-        try {
-          SimpleHttpResponse response =
-              FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments(uri, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS,
-                  null, authProvider);
-          LOGGER.info("Got response {}: {} while sending end replace segment 
request for table: {}, uploadURI: {}",
-              response.getStatusCode(), response.getResponse(), rawTableName, 
uri);
-          return true;
-        } catch (SocketTimeoutException se) {
-          // In case of the timeout, we should re-try.
-          return false;
-        } catch (HttpErrorStatusException e) {
-          if (e.getStatusCode() >= 500) {
+    FileUploadDownloadClient fileUploadDownloadClient = 
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+    try {
+      for (URI controllerUri : uriToLineageEntryIdMap.keySet()) {
+        String segmentLineageEntryId = 
uriToLineageEntryIdMap.get(controllerUri);
+        URI uri =
+            FileUploadDownloadClient.getEndReplaceSegmentsURI(controllerUri, 
rawTableName, TableType.OFFLINE.toString(),
+                segmentLineageEntryId, false);
+        DEFAULT_RETRY_POLICY.attempt(() -> {
+          try {
+            SimpleHttpResponse response =
+                fileUploadDownloadClient.endReplaceSegments(uri, 
socketTimeoutMs, null, authProvider);
+            LOGGER.info("Got response {}: {} while sending end replace segment 
request for table: {}, uploadURI: {}",
+                response.getStatusCode(), response.getResponse(), 
rawTableName, uri);
+            return true;
+          } catch (SocketTimeoutException se) {
+            // In case of the timeout, we should re-try.
             return false;
-          } else {
-            throw e;
+          } catch (HttpErrorStatusException e) {
+            if (e.getStatusCode() >= 500) {
+              return false;
+            } else {
+              throw e;
+            }
           }
-        }
-      });
+        });
+      }
+    } finally {
+      SegmentPushUtils.closeFileUploadDownloadClient(spec, 
fileUploadDownloadClient);
     }
   }
 
@@ -198,18 +207,25 @@ public class ConsistentDataPushUtils {
       LOGGER.error("Exception when pushing segments. Marking segment lineage 
entry to 'REVERTED'.", exception);
       String rawTableName = spec.getTableSpec().getTableName();
       AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
-      for (Map.Entry<URI, String> entry : uriToLineageEntryIdMap.entrySet()) {
-        String segmentLineageEntryId = entry.getValue();
-        try {
-          URI uri = 
FileUploadDownloadClient.getRevertReplaceSegmentsURI(entry.getKey(), 
rawTableName,
-              TableType.OFFLINE.name(), segmentLineageEntryId, true);
-          SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.revertReplaceSegments(uri, authProvider);
-          LOGGER.info("Got response {}: {} while sending revert replace 
segment request for table: {}, uploadURI: {}",
-              response.getStatusCode(), response.getResponse(), rawTableName, 
entry.getKey());
-        } catch (URISyntaxException | HttpErrorStatusException | IOException 
e) {
-          LOGGER.error("Exception when sending revert replace segment request 
to controller: {} for table: {}",
-              entry.getKey(), rawTableName, e);
+      FileUploadDownloadClient fileUploadDownloadClient = 
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+      int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+      try {
+        for (Map.Entry<URI, String> entry : uriToLineageEntryIdMap.entrySet()) 
{
+          String segmentLineageEntryId = entry.getValue();
+          try {
+            URI uri = 
FileUploadDownloadClient.getRevertReplaceSegmentsURI(entry.getKey(), 
rawTableName,
+                TableType.OFFLINE.name(), segmentLineageEntryId, true);
+            SimpleHttpResponse response =
+                fileUploadDownloadClient.revertReplaceSegments(uri, 
authProvider, socketTimeoutMs);
+            LOGGER.info("Got response {}: {} while sending revert replace 
segment request for table: {}, uploadURI: {}",
+                response.getStatusCode(), response.getResponse(), 
rawTableName, entry.getKey());
+          } catch (URISyntaxException | HttpErrorStatusException | IOException 
e) {
+            LOGGER.error("Exception when sending revert replace segment 
request to controller: {} for table: {}",
+                entry.getKey(), rawTableName, e);
+          }
         }
+      } finally {
+        SegmentPushUtils.closeFileUploadDownloadClient(spec, 
fileUploadDownloadClient);
       }
     }
   }
@@ -231,19 +247,25 @@ public class ConsistentDataPushUtils {
   public static Map<URI, List<String>> 
getSegmentsToReplace(SegmentGenerationJobSpec spec, String rawTableName)
       throws Exception {
     Map<URI, List<String>> uriToOfflineSegments = new HashMap<>();
-    for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
-      URI controllerURI;
-      List<String> offlineSegments;
-      try {
-        controllerURI = new URI(pinotClusterSpec.getControllerURI());
-        AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
-        Map<String, List<String>> segments =
-            FILE_UPLOAD_DOWNLOAD_CLIENT.getSegments(controllerURI, 
rawTableName, TableType.OFFLINE, true, authProvider);
-        offlineSegments = segments.get(TableType.OFFLINE.toString());
-        uriToOfflineSegments.put(controllerURI, offlineSegments);
-      } catch (URISyntaxException e) {
-        throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
+    FileUploadDownloadClient fileUploadDownloadClient = 
SegmentPushUtils.getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = SegmentPushUtils.getSocketTimeoutMs(spec);
+    try {
+      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+        URI controllerURI;
+        List<String> offlineSegments;
+        try {
+          controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+          Map<String, List<String>> segments = 
fileUploadDownloadClient.getSegments(controllerURI, rawTableName,
+              TableType.OFFLINE, true, authProvider, socketTimeoutMs);
+          offlineSegments = segments.get(TableType.OFFLINE.toString());
+          uriToOfflineSegments.put(controllerURI, offlineSegments);
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
+        }
       }
+    } finally {
+      SegmentPushUtils.closeFileUploadDownloadClient(spec, 
fileUploadDownloadClient);
     }
     return uriToOfflineSegments;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index d4e39a11a93..aa4655e6627 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.NameValuePair;
 import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
@@ -53,11 +54,14 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
 import org.apache.pinot.segment.local.constants.SegmentUploadConstants;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -65,6 +69,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TlsSpec;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -78,6 +83,43 @@ public class SegmentPushUtils implements Serializable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPushUtils.class);
   private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+  private static final String HTTP_CLIENT_CONNECTION_TIMEOUT_CONFIG = 
"http.client.connectionTimeoutMs";
+
+  @VisibleForTesting
+  static FileUploadDownloadClient 
getOrCreateFileUploadDownloadClient(SegmentGenerationJobSpec spec) {
+    TlsSpec tlsSpec = spec.getTlsSpec();
+    if (tlsSpec == null) {
+      return FILE_UPLOAD_DOWNLOAD_CLIENT;
+    }
+    return new FileUploadDownloadClient(getHttpClientConfig(tlsSpec),
+        TlsUtils.createSslContextWithoutAutoRenewal(tlsSpec.getKeyStoreType(), 
tlsSpec.getKeyStorePath(),
+            tlsSpec.getKeyStorePassword(), tlsSpec.getTrustStoreType(), 
tlsSpec.getTrustStorePath(),
+            tlsSpec.getTrustStorePassword()));
+  }
+
+  @VisibleForTesting
+  static HttpClientConfig getHttpClientConfig(TlsSpec tlsSpec) {
+    PinotConfiguration httpClientConfiguration = new PinotConfiguration();
+    httpClientConfiguration.setProperty(HTTP_CLIENT_CONNECTION_TIMEOUT_CONFIG, 
tlsSpec.getConnectTimeout());
+    return HttpClientConfig.newBuilder(httpClientConfiguration).build();
+  }
+
+  static void closeFileUploadDownloadClient(SegmentGenerationJobSpec spec,
+      FileUploadDownloadClient fileUploadDownloadClient) {
+    if (spec.getTlsSpec() == null) {
+      return;
+    }
+    try {
+      fileUploadDownloadClient.close();
+    } catch (IOException e) {
+      LOGGER.warn("Unable to close TLS-aware file upload/download client", e);
+    }
+  }
+
+  static int getSocketTimeoutMs(SegmentGenerationJobSpec spec) {
+    TlsSpec tlsSpec = spec.getTlsSpec();
+    return tlsSpec != null ? tlsSpec.getReadTimeout() : 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS;
+  }
 
   public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String 
prefix, String suffix) {
     if (StringUtils.isEmpty(prefix) && StringUtils.isEmpty(suffix)) {
@@ -164,57 +206,63 @@ public class SegmentPushUtils implements Serializable {
     LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
         Arrays.toString(tarFilePaths.subList(0, Math.min(5, 
tarFilePaths.size())).toArray()),
         Arrays.toString(spec.getPinotClusterSpecs()), tableName);
-    for (String tarFilePath : tarFilePaths) {
-      URI tarFileURI = URI.create(tarFilePath);
-      File tarFile = new File(tarFilePath);
-      String fileName = tarFile.getName();
-      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
-      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
-      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
-        URI controllerURI;
-        try {
-          controllerURI = new URI(pinotClusterSpec.getControllerURI());
-        } catch (URISyntaxException e) {
-          throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
-        }
-        LOGGER.info("Pushing segment: {} to location: {} for table {}", 
segmentName, controllerURI, tableName);
-        int attempts = 1;
-        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
-          attempts = spec.getPushJobSpec().getPushAttempts();
-        }
-        long retryWaitMs = 1000L;
-        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
-          retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
-        }
-        RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
-          try (InputStream inputStream = fileSystem.open(tarFileURI)) {
-            SimpleHttpResponse response =
-                
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
-                    segmentName, inputStream, headers,
-                    parameters, tableName, tableType);
-            LOGGER.info("Response for pushing table {} segment {} to location 
{} - {}: {}", tableName, segmentName,
-                controllerURI, response.getStatusCode(), 
response.getResponse());
-            return true;
-          } catch (HttpErrorStatusException e) {
-            int statusCode = e.getStatusCode();
-            if (statusCode >= 500) {
-              // Temporary exception
-              LOGGER.warn("Caught temporary exception while pushing table: {} 
segment: {} to {}, will retry", tableName,
-                  segmentName, controllerURI, e);
-              return false;
-            } else {
-              // Permanent exception
-              LOGGER.error("Caught permanent exception while pushing table: {} 
segment: {} to {}, won't retry",
-                  tableName, segmentName, controllerURI, e);
-              throw e;
-            }
-          } finally {
-            if (cleanUpOutputDir) {
-              fileSystem.delete(tarFileURI, true);
-            }
+    FileUploadDownloadClient fileUploadDownloadClient = 
getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = getSocketTimeoutMs(spec);
+    try {
+      for (String tarFilePath : tarFilePaths) {
+        URI tarFileURI = URI.create(tarFilePath);
+        File tarFile = new File(tarFilePath);
+        String fileName = tarFile.getName();
+        
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+        String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
           }
-        });
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", 
segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+            try (InputStream inputStream = fileSystem.open(tarFileURI)) {
+              SimpleHttpResponse response =
+                  
fileUploadDownloadClient.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
+                      segmentName, inputStream, headers, 
makeUploadSegmentParams(parameters, tableName, tableType),
+                      socketTimeoutMs);
+              LOGGER.info("Response for pushing table {} segment {} to 
location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), 
response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER.warn("Caught temporary exception while pushing table: 
{} segment: {} to {}, will retry",
+                    tableName, segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: 
{} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            } finally {
+              if (cleanUpOutputDir) {
+                fileSystem.delete(tarFileURI, true);
+              }
+            }
+          });
+        }
       }
+    } finally {
+      closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
     }
   }
 
@@ -225,53 +273,59 @@ public class SegmentPushUtils implements Serializable {
     LOGGER.info("Start sending table {} segment URIs: {} to locations: {}", 
tableName,
         Arrays.toString(segmentUris.subList(0, Math.min(5, 
segmentUris.size())).toArray()),
         Arrays.toString(spec.getPinotClusterSpecs()));
-    for (String segmentUri : segmentUris) {
-      URI segmentURI = URI.create(segmentUri);
-      PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
-      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
-        URI controllerURI;
-        try {
-          controllerURI = new URI(pinotClusterSpec.getControllerURI());
-        } catch (URISyntaxException e) {
-          throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
-        }
-        LOGGER.info("Sending table {} segment URI: {} to location: {} for ", 
tableName, segmentUri, controllerURI);
-        int attempts = 1;
-        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
-          attempts = spec.getPushJobSpec().getPushAttempts();
-        }
-        long retryWaitMs = 1000L;
-        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
-          retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
-        }
-        RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+    FileUploadDownloadClient fileUploadDownloadClient = 
getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = getSocketTimeoutMs(spec);
+    try {
+      for (String segmentUri : segmentUris) {
+        URI segmentURI = URI.create(segmentUri);
+        PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
           try {
-            SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-                
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentUri,
-                    headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
-            LOGGER.info("Response for pushing table {} segment uri {} to 
location {} - {}: {}", tableName, segmentUri,
-                controllerURI, response.getStatusCode(), 
response.getResponse());
-            return true;
-          } catch (HttpErrorStatusException e) {
-            int statusCode = e.getStatusCode();
-            if (statusCode >= 500) {
-              // Temporary exception
-              LOGGER.warn("Caught temporary exception while pushing table: {} 
segment uri: {} to {}, will retry",
-                  tableName, segmentUri, controllerURI, e);
-              return false;
-            } else {
-              // Permanent exception
-              LOGGER.error("Caught permanent exception while pushing table: {} 
segment uri: {} to {}, won't retry",
-                  tableName, segmentUri, controllerURI, e);
-              throw e;
-            }
-          } finally {
-            if (spec.isCleanUpOutputDir()) {
-              outputDirFS.delete(segmentURI, true);
-            }
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
           }
-        });
+          LOGGER.info("Sending table {} segment URI: {} to location: {} for ", 
tableName, segmentUri, controllerURI);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+            try {
+              SimpleHttpResponse response = fileUploadDownloadClient
+                  
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentUri,
+                      headers, parameters, socketTimeoutMs);
+              LOGGER.info("Response for pushing table {} segment uri {} to 
location {} - {}: {}", tableName,
+                  segmentUri, controllerURI, response.getStatusCode(), 
response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER.warn("Caught temporary exception while pushing table: 
{} segment uri: {} to {}, will retry",
+                    tableName, segmentUri, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: 
{} segment uri: {} to {}, won't retry",
+                    tableName, segmentUri, controllerURI, e);
+                throw e;
+              }
+            } finally {
+              if (spec.isCleanUpOutputDir()) {
+                outputDirFS.delete(segmentURI, true);
+              }
+            }
+          });
+        }
       }
+    } finally {
+      closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
     }
   }
 
@@ -295,83 +349,90 @@ public class SegmentPushUtils implements Serializable {
     String tableName = spec.getTableSpec().getTableName();
     LOGGER.info("Start pushing segment metadata: {} to locations: {} for table 
{}", segmentUriToTarPathMap,
         Arrays.toString(spec.getPinotClusterSpecs()), tableName);
-    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
-      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
-      String fileName = new File(tarFilePath).getName();
-      // segments stored in Pinot deep store do not have .tar.gz extension
-      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
-          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
-      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
-      File segmentMetadataFile;
-      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
-      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
-      // which requires downloading the entire segment tar gz file.
-
-      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
-      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
-      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
-        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
-            "segmentMetadata-" + UUID.randomUUID() + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
-        if (segmentMetadataFile.exists()) {
-          FileUtils.forceDelete(segmentMetadataFile);
-        }
-        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
-      } else {
-        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
-      }
-      try {
-        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
-          URI controllerURI;
-          try {
-            controllerURI = new URI(pinotClusterSpec.getControllerURI());
-          } catch (URISyntaxException e) {
-            throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
-          }
-          LOGGER.info("Pushing segment: {} to location: {} for table {}", 
segmentName, controllerURI, tableName);
-          int attempts = 1;
-          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
-            attempts = spec.getPushJobSpec().getPushAttempts();
-          }
-          long retryWaitMs = 1000L;
-          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
-            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+    FileUploadDownloadClient fileUploadDownloadClient = 
getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = getSocketTimeoutMs(spec);
+    try {
+      for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+        String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+        String fileName = new File(tarFilePath).getName();
+        // segments stored in Pinot deep store do not have .tar.gz extension
+        String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+            ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+        SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+        File segmentMetadataFile;
+        // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+        // directory. This is to avoid generating a new segment metadata tar 
gz file every time we push a segment,
+        // which requires downloading the entire segment tar gz file.
+
+        URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+        LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+        if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+          segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+              "segmentMetadata-" + UUID.randomUUID() + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+          if (segmentMetadataFile.exists()) {
+            FileUtils.forceDelete(segmentMetadataFile);
           }
-          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
-            List<Header> reqHttpHeaders = new ArrayList<>(headers);
+          fileSystem.copyToLocalFile(metadataTarGzFilePath, 
segmentMetadataFile);
+        } else {
+          segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+        }
+        try {
+          for (PinotClusterSpec pinotClusterSpec : 
spec.getPinotClusterSpecs()) {
+            URI controllerURI;
             try {
-              reqHttpHeaders.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath));
-              reqHttpHeaders.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-                  
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
-              if (spec.getPushJobSpec() != null) {
-                reqHttpHeaders.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
-                    
String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
-              }
+              controllerURI = new URI(pinotClusterSpec.getControllerURI());
+            } catch (URISyntaxException e) {
+              throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
+            }
+            LOGGER.info("Pushing segment: {} to location: {} for table {}", 
segmentName, controllerURI, tableName);
+            int attempts = 1;
+            if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+              attempts = spec.getPushJobSpec().getPushAttempts();
+            }
+            long retryWaitMs = 1000L;
+            if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+              retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+            }
+            RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+              List<Header> reqHttpHeaders = new ArrayList<>(headers);
+              try {
+                reqHttpHeaders.add(
+                    new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath));
+                reqHttpHeaders.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+                    
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+                if (spec.getPushJobSpec() != null) {
+                  reqHttpHeaders.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+                      
String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+                }
 
-              SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(
-                  FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentName,
-                  segmentMetadataFile, reqHttpHeaders, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
-              LOGGER.info("Response for pushing table {} segment {} to 
location {} - {}: {}", tableName, segmentName,
-                  controllerURI, response.getStatusCode(), 
response.getResponse());
-              return true;
-            } catch (HttpErrorStatusException e) {
-              int statusCode = e.getStatusCode();
-              if (statusCode >= 500) {
-                // Temporary exception
-                LOGGER.warn("Caught temporary exception while pushing table: 
{} segment: {} to {}, will retry",
-                    tableName, segmentName, controllerURI, e);
-                return false;
-              } else {
-                // Permanent exception
-                LOGGER.error("Caught permanent exception while pushing table: 
{} segment: {} to {}, won't retry",
-                    tableName, segmentName, controllerURI, e);
-                throw e;
+                SimpleHttpResponse response = 
fileUploadDownloadClient.uploadSegmentMetadata(
+                    
FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName,
+                    segmentMetadataFile, reqHttpHeaders, parameters, 
socketTimeoutMs);
+                LOGGER.info("Response for pushing table {} segment {} to 
location {} - {}: {}", tableName, segmentName,
+                    controllerURI, response.getStatusCode(), 
response.getResponse());
+                return true;
+              } catch (HttpErrorStatusException e) {
+                int statusCode = e.getStatusCode();
+                if (statusCode >= 500) {
+                  // Temporary exception
+                  LOGGER.warn("Caught temporary exception while pushing table: 
{} segment: {} to {}, will retry",
+                      tableName, segmentName, controllerURI, e);
+                  return false;
+                } else {
+                  // Permanent exception
+                  LOGGER.error("Caught permanent exception while pushing 
table: {} segment: {} to {}, won't retry",
+                      tableName, segmentName, controllerURI, e);
+                  throw e;
+                }
               }
-            }
-          });
+            });
+          }
+        } finally {
+          FileUtils.deleteQuietly(segmentMetadataFile);
         }
-      } finally {
-        FileUtils.deleteQuietly(segmentMetadataFile);
       }
+    } finally {
+      closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
     }
   }
 
@@ -384,6 +445,8 @@ public class SegmentPushUtils implements Serializable {
     Map<String, File> allSegmentsMetadataMap = new HashMap<>();
     File allSegmentsMetadataTarFile = null;
     int nThreads = 
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+    FileUploadDownloadClient fileUploadDownloadClient = 
getOrCreateFileUploadDownloadClient(spec);
+    int socketTimeoutMs = getSocketTimeoutMs(spec);
     ExecutorService executor = Executors.newFixedThreadPool(nThreads);
     LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {} with parallelism: {}",
         segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), 
tableName,
@@ -419,8 +482,8 @@ public class SegmentPushUtils implements Serializable {
           try {
             addHeaders(spec, reqHttpHeaders);
             URI segmentUploadURI = getBatchSegmentUploadURI(controllerURI);
-            SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
-                allSegmentsMetadataMap, reqHttpHeaders, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+            SimpleHttpResponse response = 
fileUploadDownloadClient.uploadSegmentMetadataFiles(segmentUploadURI,
+                allSegmentsMetadataMap, reqHttpHeaders, parameters, 
socketTimeoutMs);
             LOGGER.info("Response for pushing table {} segments {} to location 
{} - {}: {}", tableName,
                 segmentMetadataFileMap.keySet(), controllerURI, 
response.getStatusCode(), response.getResponse());
             return true;
@@ -447,6 +510,7 @@ public class SegmentPushUtils implements Serializable {
       if (allSegmentsMetadataTarFile != null) {
         FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
       }
+      closeFileUploadDownloadClient(spec, fileUploadDownloadClient);
       executor.shutdown();
     }
   }
@@ -522,6 +586,14 @@ public class SegmentPushUtils implements Serializable {
     }
   }
 
+  private static List<NameValuePair> 
makeUploadSegmentParams(List<NameValuePair> parameters, String tableName,
+      TableType tableType) {
+    List<NameValuePair> requestParams = parameters == null ? new ArrayList<>() 
: new ArrayList<>(parameters);
+    requestParams.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName));
+    requestParams.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.name()));
+    return requestParams;
+  }
+
   // Method helps create an uber tar file which contains the metadata files 
for all segments that are to be uploaded.
   // Additionally, it contains a segmentName to segmentDownloadURI mapping 
file which allows us to avoid sending the
   // segmentDownloadURI as a header field as there are limitations on the 
number of headers allowed in the http request.
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
index 8a86283e15f..0bf6005b8c3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
@@ -18,11 +18,22 @@
  */
 package org.apache.pinot.segment.local.utils;
 
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsParameters;
+import com.sun.net.httpserver.HttpsServer;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.security.SecureRandom;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -30,15 +41,28 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
 import org.apache.commons.io.FileUtils;
+import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.NameValuePair;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
 import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TlsSpec;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -49,10 +73,22 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 
 
 public class SegmentPushUtilsTest {
+  private static final String TLS_RESOURCE_FOLDER = "tls/";
+  private static final String TLS_KEYSTORE_FILE = "keystore.p12";
+  private static final String TLS_TRUSTSTORE_FILE = "truststore.p12";
+  private static final String TLS_STORE_PASSWORD = "changeit";
+  private static final String TLS_STORE_TYPE = "PKCS12";
+  private static final String TEST_TABLE_NAME = "testTable";
+  private static final String TEST_SEGMENT_NAME = "testSegment";
+  private static final String TEST_SEGMENT_URI = 
"file:///tmp/testSegment.tar.gz";
+  private static final String TEST_LINEAGE_ENTRY_ID = "lineageEntry";
+  private static final String OK_RESPONSE = "OK";
+
   private File _tempDir;
 
   @BeforeMethod
@@ -83,6 +119,270 @@ public class SegmentPushUtilsTest {
     Assert.assertEquals(nameValuePair.getValue(), "true");
   }
 
+  @Test
+  public void testSendSegmentUrisHonorsTlsSpec()
+      throws Exception {
+    AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+    HttpsServer httpsServer =
+        createHttpsServer("/v2/segments", new 
TestSegmentUploadHandler(receivedTlsRequest));
+    try {
+      URI controllerUri = new URI("https://localhost:"; + 
httpsServer.getAddress().getPort());
+
+      SegmentGenerationJobSpec defaultJobSpec = 
createSegmentGenerationJobSpec(controllerUri, null);
+      assertThrows(Exception.class, () -> 
SegmentPushUtils.sendSegmentUris(defaultJobSpec, List.of(TEST_SEGMENT_URI)));
+
+      SegmentGenerationJobSpec tlsJobSpec = 
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+      SegmentPushUtils.sendSegmentUris(tlsJobSpec, List.of(TEST_SEGMENT_URI));
+      assertTrue(receivedTlsRequest.get());
+    } finally {
+      httpsServer.stop(0);
+    }
+  }
+
+  @Test
+  public void testConsistentDataPushGetSegmentsToReplaceHonorsTlsSpec()
+      throws Exception {
+    AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+    HttpsServer httpsServer =
+        createHttpsServer("/segments/" + TEST_TABLE_NAME, new 
TestSegmentListHandler(receivedTlsRequest));
+    try {
+      URI controllerUri = new URI("https://localhost:"; + 
httpsServer.getAddress().getPort());
+
+      SegmentGenerationJobSpec tlsJobSpec = 
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+      Map<URI, List<String>> segmentsToReplace =
+          ConsistentDataPushUtils.getSegmentsToReplace(tlsJobSpec, 
TEST_TABLE_NAME);
+
+      assertEquals(segmentsToReplace.get(controllerUri), 
List.of(TEST_SEGMENT_NAME));
+      assertTrue(receivedTlsRequest.get());
+    } finally {
+      httpsServer.stop(0);
+    }
+  }
+
+  @Test(timeOut = 5000)
+  public void testSendSegmentUrisHonorsTlsSpecReadTimeout()
+      throws Exception {
+    AtomicBoolean receivedTlsRequest = new AtomicBoolean();
+    CountDownLatch releaseResponse = new CountDownLatch(1);
+    HttpsServer httpsServer =
+        createHttpsServer("/v2/segments", new 
TestSegmentUploadHandler(receivedTlsRequest, releaseResponse));
+    try {
+      URI controllerUri = new URI("https://localhost:"; + 
httpsServer.getAddress().getPort());
+      TlsSpec tlsSpec = createTlsSpec();
+      tlsSpec.setReadTimeout(100);
+      SegmentGenerationJobSpec tlsJobSpec = 
createSegmentGenerationJobSpec(controllerUri, tlsSpec);
+
+      assertThrows(Exception.class, () -> 
SegmentPushUtils.sendSegmentUris(tlsJobSpec, List.of(TEST_SEGMENT_URI)));
+      assertTrue(receivedTlsRequest.get());
+    } finally {
+      releaseResponse.countDown();
+      httpsServer.stop(0);
+    }
+  }
+
+  @Test
+  public void testTlsSpecConnectTimeoutConfigIsApplied() {
+    TlsSpec tlsSpec = createTlsSpec();
+    tlsSpec.setConnectTimeout(1234);
+
+    HttpClientConfig httpClientConfig = 
SegmentPushUtils.getHttpClientConfig(tlsSpec);
+
+    assertEquals(httpClientConfig.getConnectionTimeoutMs(), 1234);
+  }
+
+  @Test
+  public void testConsistentDataPushStartEndRevertHonorsTlsSpec()
+      throws Exception {
+    AtomicBoolean receivedStartRequest = new AtomicBoolean();
+    AtomicBoolean receivedEndRequest = new AtomicBoolean();
+    AtomicBoolean receivedRevertRequest = new AtomicBoolean();
+    HttpsServer httpsServer = createHttpsServer(Map.of(
+        "/segments/" + TEST_TABLE_NAME + "/startReplaceSegments",
+        new TestConsistentDataPushHandler(receivedStartRequest,
+            "{\"segmentLineageEntryId\":\"" + TEST_LINEAGE_ENTRY_ID + "\"}", 
"type=OFFLINE", "forceCleanup=true"),
+        "/segments/" + TEST_TABLE_NAME + "/endReplaceSegments",
+        new TestConsistentDataPushHandler(receivedEndRequest, OK_RESPONSE,
+            "segmentLineageEntryId=" + TEST_LINEAGE_ENTRY_ID, "cleanup=false"),
+        "/segments/" + TEST_TABLE_NAME + "/revertReplaceSegments",
+        new TestConsistentDataPushHandler(receivedRevertRequest, OK_RESPONSE,
+            "segmentLineageEntryId=" + TEST_LINEAGE_ENTRY_ID, 
"forceRevert=true")));
+    try {
+      URI controllerUri = new URI("https://localhost:"; + 
httpsServer.getAddress().getPort());
+      SegmentGenerationJobSpec tlsJobSpec = 
createSegmentGenerationJobSpec(controllerUri, createTlsSpec());
+
+      Map<URI, String> lineageEntryIds = 
ConsistentDataPushUtils.startReplaceSegments(tlsJobSpec,
+          Map.of(controllerUri, List.of("oldSegment")), 
List.of(TEST_SEGMENT_NAME));
+      assertEquals(lineageEntryIds.get(controllerUri), TEST_LINEAGE_ENTRY_ID);
+
+      ConsistentDataPushUtils.endReplaceSegments(tlsJobSpec, lineageEntryIds);
+      ConsistentDataPushUtils.handleUploadException(tlsJobSpec, 
lineageEntryIds, new RuntimeException("test"));
+
+      assertTrue(receivedStartRequest.get());
+      assertTrue(receivedEndRequest.get());
+      assertTrue(receivedRevertRequest.get());
+    } finally {
+      httpsServer.stop(0);
+    }
+  }
+
+  private static HttpsServer createHttpsServer(String path, HttpHandler 
handler)
+      throws Exception {
+    return createHttpsServer(Map.of(path, handler));
+  }
+
+  private static HttpsServer createHttpsServer(Map<String, HttpHandler> 
handlers)
+      throws Exception {
+    SSLContext sslContext = createTestSslContext();
+    HttpsServer server = HttpsServer.create(new InetSocketAddress("localhost", 
0), 0);
+    server.setHttpsConfigurator(new HttpsConfigurator(sslContext) {
+      @Override
+      public void configure(HttpsParameters params) {
+        SSLParameters sslParameters = sslContext.getDefaultSSLParameters();
+        sslParameters.setNeedClientAuth(true);
+        params.setSSLParameters(sslParameters);
+      }
+    });
+    handlers.forEach(server::createContext);
+    server.setExecutor(null);
+    server.start();
+    return server;
+  }
+
+  private static SegmentGenerationJobSpec createSegmentGenerationJobSpec(URI 
controllerUri, TlsSpec tlsSpec) {
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(TEST_TABLE_NAME);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(controllerUri.toString());
+
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    jobSpec.setTableSpec(tableSpec);
+    jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
+    jobSpec.setPushJobSpec(new PushJobSpec());
+    jobSpec.setTlsSpec(tlsSpec);
+    return jobSpec;
+  }
+
+  private static SSLContext createTestSslContext()
+      throws Exception {
+    KeyManagerFactory keyManagerFactory =
+        
TlsUtils.createKeyManagerFactory(getTlsResourcePath(TLS_KEYSTORE_FILE), 
TLS_STORE_PASSWORD, TLS_STORE_TYPE);
+    TrustManagerFactory trustManagerFactory =
+        
TlsUtils.createTrustManagerFactory(getTlsResourcePath(TLS_TRUSTSTORE_FILE), 
TLS_STORE_PASSWORD, TLS_STORE_TYPE);
+    SSLContext sslContext = SSLContext.getInstance("TLS");
+    sslContext.init(keyManagerFactory.getKeyManagers(), 
trustManagerFactory.getTrustManagers(), new SecureRandom());
+    return sslContext;
+  }
+
+  private static TlsSpec createTlsSpec() {
+    TlsSpec tlsSpec = new TlsSpec();
+    tlsSpec.setKeyStoreType(TLS_STORE_TYPE);
+    tlsSpec.setKeyStorePath(getTlsResourcePath(TLS_KEYSTORE_FILE));
+    tlsSpec.setKeyStorePassword(TLS_STORE_PASSWORD);
+    tlsSpec.setTrustStoreType(TLS_STORE_TYPE);
+    tlsSpec.setTrustStorePath(getTlsResourcePath(TLS_TRUSTSTORE_FILE));
+    tlsSpec.setTrustStorePassword(TLS_STORE_PASSWORD);
+    return tlsSpec;
+  }
+
+  private static String getTlsResourcePath(String fileName) {
+    URL resource = 
SegmentPushUtilsTest.class.getClassLoader().getResource(TLS_RESOURCE_FOLDER + 
fileName);
+    Assert.assertNotNull(resource, "Missing TLS test resource: " + fileName);
+    return resource.toString();
+  }
+
+  private static class TestSegmentUploadHandler implements HttpHandler {
+    private final AtomicBoolean _receivedTlsRequest;
+    private final CountDownLatch _releaseResponse;
+
+    private TestSegmentUploadHandler(AtomicBoolean receivedTlsRequest) {
+      this(receivedTlsRequest, null);
+    }
+
+    private TestSegmentUploadHandler(AtomicBoolean receivedTlsRequest, 
CountDownLatch releaseResponse) {
+      _receivedTlsRequest = receivedTlsRequest;
+      _releaseResponse = releaseResponse;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange)
+        throws IOException {
+      Headers requestHeaders = httpExchange.getRequestHeaders();
+      
assertEquals(requestHeaders.getFirst(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE),
+          FileUploadDownloadClient.FileUploadType.URI.toString());
+      
assertEquals(requestHeaders.getFirst(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI),
 TEST_SEGMENT_URI);
+      _receivedTlsRequest.set(true);
+      if (_releaseResponse != null) {
+        try {
+          _releaseResponse.await(2, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e);
+        }
+      }
+
+      writeResponse(httpExchange, OK_RESPONSE);
+    }
+  }
+
+  private static class TestConsistentDataPushHandler implements HttpHandler {
+    private final AtomicBoolean _receivedTlsRequest;
+    private final String _responseBody;
+    private final List<String> _expectedQueryFragments;
+
+    private TestConsistentDataPushHandler(AtomicBoolean receivedTlsRequest, 
String responseBody,
+        String... expectedQueryFragments) {
+      _receivedTlsRequest = receivedTlsRequest;
+      _responseBody = responseBody;
+      _expectedQueryFragments = Arrays.asList(expectedQueryFragments);
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange)
+        throws IOException {
+      assertEquals(httpExchange.getRequestMethod(), "POST");
+      String query = httpExchange.getRequestURI().getQuery();
+      for (String expectedQueryFragment : _expectedQueryFragments) {
+        assertTrue(query.contains(expectedQueryFragment), query);
+      }
+      _receivedTlsRequest.set(true);
+
+      writeResponse(httpExchange, _responseBody);
+    }
+  }
+
+  private static class TestSegmentListHandler implements HttpHandler {
+    private final AtomicBoolean _receivedTlsRequest;
+
+    private TestSegmentListHandler(AtomicBoolean receivedTlsRequest) {
+      _receivedTlsRequest = receivedTlsRequest;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange)
+        throws IOException {
+      _receivedTlsRequest.set(true);
+      writeResponse(httpExchange, "[{\"OFFLINE\":[\"" + TEST_SEGMENT_NAME + 
"\"]}]");
+    }
+  }
+
+  private static void writeResponse(HttpExchange httpExchange, String 
responseBody)
+      throws IOException {
+    byte[] response = responseBody.getBytes(StandardCharsets.UTF_8);
+    httpExchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+    try (OutputStream os = httpExchange.getResponseBody()) {
+      os.write(response);
+    }
+  }
+
+  @Test
+  public void testGetOrCreateFileUploadDownloadClientUsesSharedDefaultClient()
+      throws Exception {
+    SegmentGenerationJobSpec defaultJobSpec = new SegmentGenerationJobSpec();
+    FileUploadDownloadClient defaultClient = 
SegmentPushUtils.getOrCreateFileUploadDownloadClient(defaultJobSpec);
+    
Assert.assertSame(SegmentPushUtils.getOrCreateFileUploadDownloadClient(defaultJobSpec),
 defaultClient);
+  }
+
   @Test
   public void testGetSegmentUriToTarPathMap()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to