This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a56aa7e8 FTP: Add for FTPS the ability to set KeyManager and 
TrustManager
2a56aa7e8 is described below

commit 2a56aa7e81b0c03816300ddb3a375df1fd3874fc
Author: Sergey Gornostaev <[email protected]>
AuthorDate: Fri Aug 25 23:06:02 2023 +0800

    FTP: Add for FTPS the ability to set KeyManager and TrustManager
    
    ---------
    
    Co-authored-by: Matthew de Detrich <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../connectors/ftp/impl/FtpsOperations.scala       |  3 +
 .../apache/pekko/stream/connectors/ftp/model.scala | 24 ++++--
 .../ftp/FtpsWithTrustAndKeyManagersStageSpec.scala | 96 ++++++++++++++++++++++
 project/Dependencies.scala                         |  2 +-
 4 files changed, 119 insertions(+), 6 deletions(-)

diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
index 4fad15ca3..5f725e05e 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
@@ -31,6 +31,9 @@ private[ftp] trait FtpsOperations extends CommonFtpOperations 
{
     Try {
       connectionSettings.proxy.foreach(ftpClient.setProxy)
 
+      connectionSettings.keyManager.foreach(ftpClient.setKeyManager)
+      connectionSettings.trustManager.foreach(ftpClient.setTrustManager)
+
       if (ftpClient.getAutodetectUTF8() != connectionSettings.autodetectUTF8) {
         ftpClient.setAutodetectUTF8(connectionSettings.autodetectUTF8)
       }
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
index e489d09dd..0a05dd994 100644
--- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
+++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
@@ -15,6 +15,8 @@ package org.apache.pekko.stream.connectors.ftp
 
 import java.net.InetAddress
 import java.net.Proxy
+import javax.net.ssl.KeyManager
+import javax.net.ssl.TrustManager
 import java.nio.file.attribute.PosixFilePermission
 
 import org.apache.pekko.annotation.{ DoNotInherit, InternalApi }
@@ -185,7 +187,9 @@ final class FtpsSettings private (
     val passiveMode: Boolean,
     val autodetectUTF8: Boolean,
     val configureConnection: FTPSClient => Unit,
-    val proxy: Option[Proxy]) extends FtpFileSettings {
+    val proxy: Option[Proxy],
+    val keyManager: Option[KeyManager],
+    val trustManager: Option[TrustManager]) extends FtpFileSettings {
 
   def withHost(value: java.net.InetAddress): FtpsSettings = copy(host = value)
   def withPort(value: Int): FtpsSettings = copy(port = value)
@@ -196,6 +200,8 @@ final class FtpsSettings private (
   def withAutodetectUTF8(value: Boolean): FtpsSettings =
     if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value)
   def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
+  def withKeyManager(value: KeyManager): FtpsSettings = copy(keyManager = 
Some(value))
+  def withTrustManager(value: TrustManager): FtpsSettings = copy(trustManager 
= Some(value))
 
   /**
    * Scala API:
@@ -220,7 +226,9 @@ final class FtpsSettings private (
       passiveMode: Boolean = passiveMode,
       autodetectUTF8: Boolean = autodetectUTF8,
       configureConnection: FTPSClient => Unit = configureConnection,
-      proxy: Option[Proxy] = proxy): FtpsSettings = new FtpsSettings(
+      proxy: Option[Proxy] = proxy,
+      keyManager: Option[KeyManager] = keyManager,
+      trustManager: Option[TrustManager] = trustManager): FtpsSettings = new 
FtpsSettings(
     host = host,
     port = port,
     credentials = credentials,
@@ -228,7 +236,9 @@ final class FtpsSettings private (
     passiveMode = passiveMode,
     autodetectUTF8 = autodetectUTF8,
     configureConnection = configureConnection,
-    proxy = proxy)
+    proxy = proxy,
+    keyManager = keyManager,
+    trustManager = trustManager)
 
   override def toString =
     "FtpsSettings(" +
@@ -239,7 +249,9 @@ final class FtpsSettings private (
     s"passiveMode=$passiveMode," +
     s"autodetectUTF8=$autodetectUTF8" +
     s"configureConnection=$configureConnection," +
-    s"proxy=$proxy)"
+    s"proxy=$proxy" +
+    s"keyManager=$keyManager" +
+    s"trustManager=$trustManager)"
 }
 
 /**
@@ -259,7 +271,9 @@ object FtpsSettings {
     passiveMode = false,
     autodetectUTF8 = false,
     configureConnection = _ => (),
-    proxy = None)
+    proxy = None,
+    keyManager = None,
+    trustManager = None)
 
   /** Java API */
   def create(host: java.net.InetAddress): FtpsSettings = apply(
diff --git 
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
 
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
new file mode 100644
index 000000000..6fb9557b5
--- /dev/null
+++ 
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.ftp
+
+import org.apache.pekko
+import pekko.stream.IOResult
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.util.ByteString
+import pekko.{ Done, NotUsed }
+import org.mockito.ArgumentMatchers.{ any, anyString }
+import org.mockito.Mockito.{ atLeastOnce, doNothing, verify }
+import org.scalatestplus.mockito.MockitoSugar
+
+import java.net.{ InetAddress, Socket }
+import java.security.cert.X509Certificate
+import javax.net.ssl.{ X509ExtendedKeyManager, X509ExtendedTrustManager }
+import scala.concurrent.Future
+
+class FtpsWithTrustAndKeyManagersStageSpec extends BaseFtpsSpec with 
CommonFtpStageSpec with MockitoSugar {
+
+  // The implementation of X509ExtendedTrustManager and X509ExtendedKeyManager 
is final so
+  // its not possible to put a Mockito spy on it, instead lets just mock the 
classes and the
+  // checkServerTrusted method which is executed only when 
trustManager/keyManager is setup in FtpsSettings
+
+  val keyManager: X509ExtendedKeyManager = mock[X509ExtendedKeyManager]
+  val trustManager: X509ExtendedTrustManager = mock[X509ExtendedTrustManager]
+
+  
doNothing().when(trustManager).checkServerTrusted(any(classOf[Array[X509Certificate]]),
 anyString,
+    any(classOf[Socket]))
+
+  override val settings =
+    FtpsSettings(
+      InetAddress.getByName(HOSTNAME)).withPort(PORT)
+      .withCredentials(CREDENTIALS)
+      .withBinary(true)
+      .withPassiveMode(true)
+      .withTrustManager(trustManager)
+      .withKeyManager(keyManager)
+
+  private def verifyServerCheckCertificate(): Unit =
+    verify(trustManager, 
atLeastOnce()).checkServerTrusted(any(classOf[Array[X509Certificate]]), 
anyString,
+      any(classOf[Socket]))
+
+  private def verifyAfterStream[O, Mat](source: Source[O, Mat]): Source[O, 
Mat] =
+    source.map { result =>
+      verifyServerCheckCertificate()
+      result
+    }
+
+  private def verifyAfterStream[I, Mat](sink: Sink[I, Mat]): Sink[I, Mat] =
+    sink.mapMaterializedValue { result =>
+      verifyServerCheckCertificate()
+      result
+    }
+
+  override protected def listFiles(basePath: String): Source[FtpFile, NotUsed] 
=
+    verifyAfterStream(super.listFiles(basePath))
+
+  override protected def listFilesWithFilter(basePath: String, branchSelector: 
FtpFile => Boolean,
+      emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] =
+    verifyAfterStream(super.listFilesWithFilter(basePath, branchSelector, 
emitTraversedDirectories))
+
+  override protected def retrieveFromPath(path: String, fromRoot: Boolean): 
Source[ByteString, Future[IOResult]] =
+    verifyAfterStream(super.retrieveFromPath(path, fromRoot))
+
+  override protected def retrieveFromPathWithOffset(path: String, offset: 
Long): Source[ByteString, Future[IOResult]] =
+    verifyAfterStream(super.retrieveFromPathWithOffset(path, offset))
+
+  override protected def storeToPath(path: String, append: Boolean): 
Sink[ByteString, Future[IOResult]] =
+    verifyAfterStream(super.storeToPath(path, append))
+
+  override protected def remove(): Sink[FtpFile, Future[IOResult]] =
+    verifyAfterStream(super.remove())
+
+  override protected def move(destinationPath: FtpFile => String): 
Sink[FtpFile, Future[IOResult]] =
+    verifyAfterStream(super.move(destinationPath))
+
+  override protected def mkdir(basePath: String, name: String): Source[Done, 
NotUsed] =
+    verifyAfterStream(super.mkdir(basePath, name))
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index d3183886c..13e221d46 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -173,7 +173,7 @@ object Dependencies {
   val Ftp = Seq(
     libraryDependencies ++= Seq(
       "commons-net" % "commons-net" % "3.8.0",
-      "com.hierynomus" % "sshj" % "0.33.0"))
+      "com.hierynomus" % "sshj" % "0.33.0") ++ Mockito)
 
   val GeodeVersion = "1.15.0"
   val GeodeVersionForDocs = "115"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to