This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 2a56aa7e8 FTP: Add for FTPS the ability to set KeyManager and
TrustManager
2a56aa7e8 is described below
commit 2a56aa7e81b0c03816300ddb3a375df1fd3874fc
Author: Sergey Gornostaev <[email protected]>
AuthorDate: Fri Aug 25 23:06:02 2023 +0800
FTP: Add for FTPS the ability to set KeyManager and TrustManager
---------
Co-authored-by: Matthew de Detrich <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
.../connectors/ftp/impl/FtpsOperations.scala | 3 +
.../apache/pekko/stream/connectors/ftp/model.scala | 24 ++++--
.../ftp/FtpsWithTrustAndKeyManagersStageSpec.scala | 96 ++++++++++++++++++++++
project/Dependencies.scala | 2 +-
4 files changed, 119 insertions(+), 6 deletions(-)
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
index 4fad15ca3..5f725e05e 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
@@ -31,6 +31,9 @@ private[ftp] trait FtpsOperations extends CommonFtpOperations
{
Try {
connectionSettings.proxy.foreach(ftpClient.setProxy)
+ connectionSettings.keyManager.foreach(ftpClient.setKeyManager)
+ connectionSettings.trustManager.foreach(ftpClient.setTrustManager)
+
if (ftpClient.getAutodetectUTF8() != connectionSettings.autodetectUTF8) {
ftpClient.setAutodetectUTF8(connectionSettings.autodetectUTF8)
}
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
index e489d09dd..0a05dd994 100644
--- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
+++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
@@ -15,6 +15,8 @@ package org.apache.pekko.stream.connectors.ftp
import java.net.InetAddress
import java.net.Proxy
+import javax.net.ssl.KeyManager
+import javax.net.ssl.TrustManager
import java.nio.file.attribute.PosixFilePermission
import org.apache.pekko.annotation.{ DoNotInherit, InternalApi }
@@ -185,7 +187,9 @@ final class FtpsSettings private (
val passiveMode: Boolean,
val autodetectUTF8: Boolean,
val configureConnection: FTPSClient => Unit,
- val proxy: Option[Proxy]) extends FtpFileSettings {
+ val proxy: Option[Proxy],
+ val keyManager: Option[KeyManager],
+ val trustManager: Option[TrustManager]) extends FtpFileSettings {
def withHost(value: java.net.InetAddress): FtpsSettings = copy(host = value)
def withPort(value: Int): FtpsSettings = copy(port = value)
@@ -196,6 +200,8 @@ final class FtpsSettings private (
def withAutodetectUTF8(value: Boolean): FtpsSettings =
if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value)
def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
+ def withKeyManager(value: KeyManager): FtpsSettings = copy(keyManager =
Some(value))
+ def withTrustManager(value: TrustManager): FtpsSettings = copy(trustManager
= Some(value))
/**
* Scala API:
@@ -220,7 +226,9 @@ final class FtpsSettings private (
passiveMode: Boolean = passiveMode,
autodetectUTF8: Boolean = autodetectUTF8,
configureConnection: FTPSClient => Unit = configureConnection,
- proxy: Option[Proxy] = proxy): FtpsSettings = new FtpsSettings(
+ proxy: Option[Proxy] = proxy,
+ keyManager: Option[KeyManager] = keyManager,
+ trustManager: Option[TrustManager] = trustManager): FtpsSettings = new
FtpsSettings(
host = host,
port = port,
credentials = credentials,
@@ -228,7 +236,9 @@ final class FtpsSettings private (
passiveMode = passiveMode,
autodetectUTF8 = autodetectUTF8,
configureConnection = configureConnection,
- proxy = proxy)
+ proxy = proxy,
+ keyManager = keyManager,
+ trustManager = trustManager)
override def toString =
"FtpsSettings(" +
@@ -239,7 +249,9 @@ final class FtpsSettings private (
s"passiveMode=$passiveMode," +
s"autodetectUTF8=$autodetectUTF8" +
s"configureConnection=$configureConnection," +
- s"proxy=$proxy)"
+ s"proxy=$proxy" +
+ s"keyManager=$keyManager" +
+ s"trustManager=$trustManager)"
}
/**
@@ -259,7 +271,9 @@ object FtpsSettings {
passiveMode = false,
autodetectUTF8 = false,
configureConnection = _ => (),
- proxy = None)
+ proxy = None,
+ keyManager = None,
+ trustManager = None)
/** Java API */
def create(host: java.net.InetAddress): FtpsSettings = apply(
diff --git
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
new file mode 100644
index 000000000..6fb9557b5
--- /dev/null
+++
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.ftp
+
+import org.apache.pekko
+import pekko.stream.IOResult
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.util.ByteString
+import pekko.{ Done, NotUsed }
+import org.mockito.ArgumentMatchers.{ any, anyString }
+import org.mockito.Mockito.{ atLeastOnce, doNothing, verify }
+import org.scalatestplus.mockito.MockitoSugar
+
+import java.net.{ InetAddress, Socket }
+import java.security.cert.X509Certificate
+import javax.net.ssl.{ X509ExtendedKeyManager, X509ExtendedTrustManager }
+import scala.concurrent.Future
+
+class FtpsWithTrustAndKeyManagersStageSpec extends BaseFtpsSpec with
CommonFtpStageSpec with MockitoSugar {
+
+ // The implementation of X509ExtendedTrustManager and X509ExtendedKeyManager
is final so
+ // its not possible to put a Mockito spy on it, instead lets just mock the
classes and the
+ // checkServerTrusted method which is executed only when
trustManager/keyManager is setup in FtpsSettings
+
+ val keyManager: X509ExtendedKeyManager = mock[X509ExtendedKeyManager]
+ val trustManager: X509ExtendedTrustManager = mock[X509ExtendedTrustManager]
+
+
doNothing().when(trustManager).checkServerTrusted(any(classOf[Array[X509Certificate]]),
anyString,
+ any(classOf[Socket]))
+
+ override val settings =
+ FtpsSettings(
+ InetAddress.getByName(HOSTNAME)).withPort(PORT)
+ .withCredentials(CREDENTIALS)
+ .withBinary(true)
+ .withPassiveMode(true)
+ .withTrustManager(trustManager)
+ .withKeyManager(keyManager)
+
+ private def verifyServerCheckCertificate(): Unit =
+ verify(trustManager,
atLeastOnce()).checkServerTrusted(any(classOf[Array[X509Certificate]]),
anyString,
+ any(classOf[Socket]))
+
+ private def verifyAfterStream[O, Mat](source: Source[O, Mat]): Source[O,
Mat] =
+ source.map { result =>
+ verifyServerCheckCertificate()
+ result
+ }
+
+ private def verifyAfterStream[I, Mat](sink: Sink[I, Mat]): Sink[I, Mat] =
+ sink.mapMaterializedValue { result =>
+ verifyServerCheckCertificate()
+ result
+ }
+
+ override protected def listFiles(basePath: String): Source[FtpFile, NotUsed]
=
+ verifyAfterStream(super.listFiles(basePath))
+
+ override protected def listFilesWithFilter(basePath: String, branchSelector:
FtpFile => Boolean,
+ emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] =
+ verifyAfterStream(super.listFilesWithFilter(basePath, branchSelector,
emitTraversedDirectories))
+
+ override protected def retrieveFromPath(path: String, fromRoot: Boolean):
Source[ByteString, Future[IOResult]] =
+ verifyAfterStream(super.retrieveFromPath(path, fromRoot))
+
+ override protected def retrieveFromPathWithOffset(path: String, offset:
Long): Source[ByteString, Future[IOResult]] =
+ verifyAfterStream(super.retrieveFromPathWithOffset(path, offset))
+
+ override protected def storeToPath(path: String, append: Boolean):
Sink[ByteString, Future[IOResult]] =
+ verifyAfterStream(super.storeToPath(path, append))
+
+ override protected def remove(): Sink[FtpFile, Future[IOResult]] =
+ verifyAfterStream(super.remove())
+
+ override protected def move(destinationPath: FtpFile => String):
Sink[FtpFile, Future[IOResult]] =
+ verifyAfterStream(super.move(destinationPath))
+
+ override protected def mkdir(basePath: String, name: String): Source[Done,
NotUsed] =
+ verifyAfterStream(super.mkdir(basePath, name))
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index d3183886c..13e221d46 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -173,7 +173,7 @@ object Dependencies {
val Ftp = Seq(
libraryDependencies ++= Seq(
"commons-net" % "commons-net" % "3.8.0",
- "com.hierynomus" % "sshj" % "0.33.0"))
+ "com.hierynomus" % "sshj" % "0.33.0") ++ Mockito)
val GeodeVersion = "1.15.0"
val GeodeVersionForDocs = "115"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]