jtjeferreira commented on code in PR #827: URL: https://github.com/apache/pekko-connectors/pull/827#discussion_r1774948240
########## aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala: ########## @@ -35,14 +35,16 @@ import pekko.util.ByteString import pekko.util.OptionConverters import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ -import software.amazon.awssdk.http.SdkHttpRequest +import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.{ Await, ExecutionContext } +import scala.jdk.DurationConverters._ -class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit +class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)( Review Comment: added `private[awsspi]` for testing visibility ########## aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala: ########## @@ -151,18 +153,42 @@ object PekkoHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } + // based on NettyNioAsyncHttpClient and ApacheHttpClient + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code + private[awsspi] def buildConnectionPoolSettings( + base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = { + def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = + if (duration.isZero) scala.concurrent.duration.Duration.Inf + else duration.toScala + + base + .withUpdatedConnectionSettings(s => + s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) + .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala)) + .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) + .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) + } + Review Comment: These are settings that the method `buildConnectionPoolSettings` is "translating" and the default settings in pekko-http and AWS SDK | pekko-http setting | AWS SDK setting | | --------------|------------- | | connecting-timeout (10s) | CONNECTION_TIMEOUT (2s) | | idle-timeout = 60 s | CONNECTION_MAX_IDLE_TIMEOUT (60s) | | host-connection-pool.max-connections = 4 | MAX_CONNECTIONS (50) | | host-connection-pool.max-connection-lifetime = infinite | CONNECTION_TIME_TO_LIVE (0 AKA infinite)| Only `connecting-timeout` and `max-connections` are different... There might be other settings that can be mapped... ########## aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala: ########## @@ -151,18 +153,42 @@ object PekkoHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } + // based on NettyNioAsyncHttpClient and ApacheHttpClient + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code + private[awsspi] def buildConnectionPoolSettings( + base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = { + def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = + if (duration.isZero) scala.concurrent.duration.Duration.Inf + else duration.toScala + + base + .withUpdatedConnectionSettings(s => + s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) + .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala)) + .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) + .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) + } + def builder() = PekkoHttpClientBuilder() case class PekkoHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None, private val executionContext: Option[ExecutionContext] = None, - private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) + private val connectionPoolSettings: Option[ConnectionPoolSettings] = None, + private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings = Review Comment: the important change of this PR is to add a `connectionPoolSettingsBuilder` that can be used to configure the `connectionPoolSettings` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org