jtjeferreira commented on code in PR #827:
URL: https://github.com/apache/pekko-connectors/pull/827#discussion_r1774948240


##########
aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala:
##########
@@ -35,14 +35,16 @@ import pekko.util.ByteString
 import pekko.util.OptionConverters
 import org.slf4j.LoggerFactory
 import software.amazon.awssdk.http.async._
-import software.amazon.awssdk.http.SdkHttpRequest
+import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, 
SdkHttpRequest }
 import software.amazon.awssdk.utils.AttributeMap
 
 import scala.collection.immutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{ Await, ExecutionContext }
+import scala.jdk.DurationConverters._
 
-class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: 
ConnectionPoolSettings)(implicit
+class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val 
connectionSettings: ConnectionPoolSettings)(

Review Comment:
   added `private[awsspi]` for testing visibility



##########
aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala:
##########
@@ -151,18 +153,42 @@ object PekkoHttpClient {
     else throw new RuntimeException(s"Could not parse custom content type 
'$contentTypeStr'.")
   }
 
+  // based on NettyNioAsyncHttpClient and ApacheHttpClient
+  // 
https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code
+  // 
https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code
+  private[awsspi] def buildConnectionPoolSettings(
+      base: ConnectionPoolSettings, attributeMap: AttributeMap): 
ConnectionPoolSettings = {
+    def zeroToInfinite(duration: java.time.Duration): 
scala.concurrent.duration.Duration =
+      if (duration.isZero) scala.concurrent.duration.Duration.Inf
+      else duration.toScala
+
+    base
+      .withUpdatedConnectionSettings(s =>
+        
s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala)
+          
.withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala))
+      
.withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue())
+      
.withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)))
+  }
+

Review Comment:
   These are settings that the method `buildConnectionPoolSettings` is 
"translating" and the default settings in pekko-http and AWS SDK
   
   | pekko-http setting | AWS SDK setting | 
   | --------------|------------- | 
   | connecting-timeout (10s) | CONNECTION_TIMEOUT (2s) | 
   | idle-timeout = 60 s | CONNECTION_MAX_IDLE_TIMEOUT (60s) | 
   | host-connection-pool.max-connections = 4 | MAX_CONNECTIONS (50) | 
   | host-connection-pool.max-connection-lifetime = infinite | 
CONNECTION_TIME_TO_LIVE (0 AKA infinite)| 
   
   Only `connecting-timeout` and `max-connections` are different...
   
   There might be other settings that can be mapped...



##########
aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala:
##########
@@ -151,18 +153,42 @@ object PekkoHttpClient {
     else throw new RuntimeException(s"Could not parse custom content type 
'$contentTypeStr'.")
   }
 
+  // based on NettyNioAsyncHttpClient and ApacheHttpClient
+  // 
https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code
+  // 
https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code
+  private[awsspi] def buildConnectionPoolSettings(
+      base: ConnectionPoolSettings, attributeMap: AttributeMap): 
ConnectionPoolSettings = {
+    def zeroToInfinite(duration: java.time.Duration): 
scala.concurrent.duration.Duration =
+      if (duration.isZero) scala.concurrent.duration.Duration.Inf
+      else duration.toScala
+
+    base
+      .withUpdatedConnectionSettings(s =>
+        
s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala)
+          
.withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala))
+      
.withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue())
+      
.withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)))
+  }
+
   def builder() = PekkoHttpClientBuilder()
 
   case class PekkoHttpClientBuilder(private val actorSystem: 
Option[ActorSystem] = None,
       private val executionContext: Option[ExecutionContext] = None,
-      private val connectionPoolSettings: Option[ConnectionPoolSettings] = 
None)
+      private val connectionPoolSettings: Option[ConnectionPoolSettings] = 
None,
+      private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, 
AttributeMap) => ConnectionPoolSettings =

Review Comment:
   the important change of this PR is to add a `connectionPoolSettingsBuilder` 
that can be used to configure the `connectionPoolSettings`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to