This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 401da8b68 Rename none credentials to access-token and remove token
from none (#1021)
401da8b68 is described below
commit 401da8b68fb02b246ee54e4d68f4a246950e6468
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Jul 13 07:12:48 2025 +0200
Rename none credentials to access-token and remove token from none (#1021)
---
.../src/main/resources/reference.conf | 1 -
.../bigquery/scaladsl/BigQueryQueriesSpec.scala | 2 +-
.../src/test/resources/application.conf | 1 -
.../google/AuthorizationHeaderSpec.scala | 97 ++++++++++++++++++++++
.../googlecloud/pubsub/PubSubMockSpec.scala | 70 ++++++++++++++++
.../googlecloud/pubsub/impl/PubSubApiSpec.scala | 77 ++++-------------
google-common/src/main/resources/reference.conf | 10 ++-
...dentials.scala => AccessTokenCredentials.scala} | 18 ++--
.../connectors/google/auth/Credentials.scala | 28 +++++--
.../connectors/google/auth/NoCredentials.scala | 10 +--
.../connectors/google/auth/OAuth2Credentials.scala | 3 +-
.../stream/connectors/google/http/GoogleHttp.scala | 16 ++--
google-common/src/test/resources/application.conf | 4 +-
.../connectors/google/http/GoogleHttpSpec.scala | 7 +-
.../firebase/fcm/v1/impl/FcmSenderSpec.scala | 2 -
15 files changed, 241 insertions(+), 105 deletions(-)
diff --git a/google-cloud-bigquery-storage/src/main/resources/reference.conf
b/google-cloud-bigquery-storage/src/main/resources/reference.conf
index 9494f71ae..4210f5fc6 100644
--- a/google-cloud-bigquery-storage/src/main/resources/reference.conf
+++ b/google-cloud-bigquery-storage/src/main/resources/reference.conf
@@ -8,7 +8,6 @@ pekko.connectors.google {
provider = none
none {
project-id = "pekko-connectors-google-test"
- token = "yyyy.c.an-access-token"
}
}
diff --git
a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
index a02043b99..1aa488079 100644
---
a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
+++
b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
@@ -50,7 +50,7 @@ class BigQueryQueriesSpec
jsonFormat10(QueryResponse[T])
}
- implicit val settings: GoogleSettings = GoogleSettings().copy(credentials =
NoCredentials("", ""))
+ implicit val settings: GoogleSettings = GoogleSettings().copy(credentials =
NoCredentials(""))
val jobId = "jobId"
val pageToken = "pageToken"
diff --git a/google-cloud-pub-sub/src/test/resources/application.conf
b/google-cloud-pub-sub/src/test/resources/application.conf
index 81cf6ad95..7cebb6a78 100644
--- a/google-cloud-pub-sub/src/test/resources/application.conf
+++ b/google-cloud-pub-sub/src/test/resources/application.conf
@@ -29,7 +29,6 @@ pekko.connectors.google {
provider = none
none {
project-id = "pekko-connectors"
- token = "TESTTOKEN"
}
}
}
diff --git
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
new file mode 100644
index 000000000..55b75b319
--- /dev/null
+++
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.google
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.headers.OAuth2BearerToken
+import pekko.stream.connectors.googlecloud.pubsub.impl.TestCredentials
+import pekko.stream.connectors.googlecloud.pubsub.{ PubSubMockSpec,
PublishMessage, PublishRequest }
+import pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import pekko.stream.connectors.google.auth.{ Credentials,
RetrievableCredentials }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock.{ aResponse, urlEqualTo
}
+import com.typesafe.config.ConfigFactory
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{ when, withSettings }
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+import org.scalatestplus.mockito.MockitoSugar
+
+import java.util.Base64
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.collection.immutable.Seq
+import scala.concurrent.duration._
+
+class AuthorizationHeaderSpec extends AnyFlatSpec with BeforeAndAfterAll with
ScalaFutures with Matchers
+ with LogCapturing
+ with PubSubMockSpec with MockitoSugar {
+
+ implicit val system: ActorSystem = ActorSystem(
+ "AuthorizationHeaderSpec",
+ ConfigFactory
+ .parseString(
+ s"pekko.connectors.google.credentials.none.project-id =
${TestCredentials.projectId}")
+ .withFallback(ConfigFactory.load()))
+
+ implicit val defaultPatience: PatienceConfig =
+ PatienceConfig(timeout = 5.seconds, interval = 100.millis)
+
+ it should "publish with auth header" in {
+
+ val accessToken = "yyyy.c.an-access-token"
+ val credentials =
+ mock[Credentials with
RetrievableCredentials](withSettings().extraInterfaces(classOf[RetrievableCredentials]))
+
+ when(credentials.get()(any[ExecutionContext],
any[RequestSettings])).thenReturn(
+ Future.successful(OAuth2BearerToken(accessToken))
+ )
+
+ val publishMessage =
+ PublishMessage(
+ data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)),
+ attributes = Map("row_id" -> "7"))
+ val publishRequest = PublishRequest(Seq(publishMessage))
+
+ val expectedPublishRequest =
+
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
+ val publishResponse = """{"messageIds":["1"]}"""
+
+ wireMock.register(
+ WireMock
+ .post(
+
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
+ .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
+ .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(publishResponse)
+ .withHeader("Content-Type", "application/json")))
+ val flow = TestHttpApi.publish[Unit]("topic1", 1)
+ val result =
+ Source.single((publishRequest, ())).via(flow)
+
.withAttributes(GoogleAttributes.settings(GoogleSettings().copy(credentials =
credentials)))
+ .toMat(Sink.head)(Keep.right).run()
+ result.futureValue._1.messageIds shouldBe Seq("1")
+ result.futureValue._2 shouldBe (())
+ }
+}
diff --git
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
new file mode 100644
index 000000000..65bee84a8
--- /dev/null
+++
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.googlecloud.pubsub
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.{ ConnectionContext, Http }
+import pekko.stream.connectors.googlecloud.pubsub.impl.{ NoopTrustManager,
PubSubApi }
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier
+import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+
+import javax.net.ssl.{ SSLContext, SSLEngine }
+import org.scalatest.{ BeforeAndAfterAll, Suite }
+
+trait PubSubMockSpec extends Suite with BeforeAndAfterAll {
+ implicit val system: ActorSystem
+
+ def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
+ val sslContext: SSLContext = SSLContext.getInstance("TLS")
+ sslContext.init(null, Array(new NoopTrustManager()), null)
+
+ val engine = sslContext.createSSLEngine(host, port)
+ engine.setUseClientMode(true)
+
+ engine
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
Http().setDefaultClientHttpsContext(ConnectionContext.httpsClient(createInsecureSslEngine
_))
+ }
+
+ val wiremockServer = new WireMockServer(
+ wireMockConfig().dynamicPort().dynamicHttpsPort().notifier(new
ConsoleNotifier(false)))
+ wiremockServer.start()
+
+ val wireMock = new WireMock("localhost", wiremockServer.port())
+
+ object TestHttpApi extends PubSubApi {
+ val isEmulated = false
+ val PubSubGoogleApisHost = "localhost"
+ val PubSubGoogleApisPort = wiremockServer.httpsPort()
+ }
+
+ object TestEmulatorHttpApi extends PubSubApi {
+ override val isEmulated = true
+ val PubSubGoogleApisHost = "localhost"
+ val PubSubGoogleApisPort = wiremockServer.port()
+ }
+
+ val config = PubSubConfig()
+
+}
diff --git
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
index f443719d1..c01634bc6 100644
---
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
+++
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
@@ -16,15 +16,11 @@ package
org.apache.pekko.stream.connectors.googlecloud.pubsub.impl
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
-import pekko.http.scaladsl.{ ConnectionContext, Http }
import pekko.stream.connectors.googlecloud.pubsub._
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.{ Keep, Sink, Source }
-import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.client.WireMock.{ aResponse, urlEqualTo
}
-import com.github.tomakehurst.wiremock.common.ConsoleNotifier
-import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
@@ -34,7 +30,7 @@ import org.scalatest.matchers.should.Matchers
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.Base64
-import javax.net.ssl.{ SSLContext, SSLEngine, X509TrustManager }
+import javax.net.ssl.X509TrustManager
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -51,7 +47,8 @@ class NoopTrustManager extends X509TrustManager {
}
}
-class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with
ScalaFutures with Matchers with LogCapturing {
+class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with
ScalaFutures with Matchers with LogCapturing
+ with PubSubMockSpec {
implicit val system: ActorSystem = ActorSystem(
"PubSubApiSpec",
@@ -63,40 +60,6 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = 5.seconds, interval = 100.millis)
- def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
- val sslContext: SSLContext = SSLContext.getInstance("TLS")
- sslContext.init(null, Array(new NoopTrustManager()), null)
-
- val engine = sslContext.createSSLEngine(host, port)
- engine.setUseClientMode(true)
-
- engine
- }
-
-
Http().setDefaultClientHttpsContext(ConnectionContext.httpsClient(createInsecureSslEngine
_))
-
- val wiremockServer = new WireMockServer(
- wireMockConfig().dynamicPort().dynamicHttpsPort().notifier(new
ConsoleNotifier(false)))
- wiremockServer.start()
-
- val mock = new WireMock("localhost", wiremockServer.port())
-
- private object TestHttpApi extends PubSubApi {
- val isEmulated = false
- val PubSubGoogleApisHost = "localhost"
- val PubSubGoogleApisPort = wiremockServer.httpsPort()
- }
-
- private object TestEmulatorHttpApi extends PubSubApi {
- override val isEmulated = true
- val PubSubGoogleApisHost = "localhost"
- val PubSubGoogleApisPort = wiremockServer.port()
- }
-
- val config = PubSubConfig()
-
- val accessToken = "TESTTOKEN"
-
it should "publish" in {
val publishMessage =
@@ -109,12 +72,11 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
val publishResponse = """{"messageIds":["1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
@@ -140,12 +102,11 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"},"orderingKey":"my-ordering-key"}]}"""
val publishResponse = """{"messageIds":["1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
@@ -175,12 +136,11 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
val publishResponse = """{"messageIds":["1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
@@ -203,7 +163,7 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
val publishResponse = """{"messageIds":["1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
@@ -236,13 +196,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(pullRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
"application/json")))
val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -266,13 +225,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(pullRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
"application/json")))
val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -295,7 +253,7 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
@@ -317,13 +275,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(pullRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
"application/json")))
val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -339,13 +296,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(pullRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(418).withBody(pullResponse).withHeader("Content-Type",
"application/json")))
val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -358,13 +314,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
it should "acknowledge" in {
val ackRequest = """{"ackIds":["ack1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(ackRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(200)))
val acknowledgeRequest = AcknowledgeRequest("ack1")
@@ -378,13 +333,12 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
it should "fail acknowledge when result code is not success" in {
val ackRequest = """{"ackIds":["ack1"]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(ackRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(aResponse().withStatus(401)))
val acknowledgeRequest = AcknowledgeRequest("ack1")
@@ -406,12 +360,11 @@ class PubSubApiSpec extends AnyFlatSpec with
BeforeAndAfterAll with ScalaFutures
val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
- mock.register(
+ wireMock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
- .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(404)
diff --git a/google-common/src/main/resources/reference.conf
b/google-common/src/main/resources/reference.conf
index 48a33ca28..63e79b552 100644
--- a/google-common/src/main/resources/reference.conf
+++ b/google-common/src/main/resources/reference.conf
@@ -8,8 +8,8 @@ pekko.connectors.google {
# Override to specify custom scopes
scopes = ${pekko.connectors.google.credentials.default-scopes}
- # Options: application-default, service-account, compute-engine, none
- # application-default first tries service-account then compute-engine
+ # Options: application-default, service-account, compute-engine,
access-token, none
+ # application-default first tries service-account, compute-engine then
access-token
provider = application-default
service-account {
@@ -48,9 +48,13 @@ pekko.connectors.google {
path = ${?GOOGLE_APPLICATION_CREDENTIALS}
}
+ access-token {
+ project-id = ""
+ token = ""
+ }
+
none {
project-id = "<no-project-id>"
- token = "<no-token>"
}
}
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
similarity index 69%
copy from
google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
copy to
google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
index 6bff78f49..7e15eb3f0 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
@@ -25,25 +25,25 @@ import java.util
import scala.concurrent.{ ExecutionContext, Future }
@InternalApi
-private[connectors] object NoCredentials {
-
- def apply(c: Config): NoCredentials =
NoCredentials(c.getString("project-id"), c.getString("token"))
-
+private[auth] object AccessTokenCredentials {
+ def apply(c: Config): AccessTokenCredentials =
AccessTokenCredentials(c.getString("project-id"), c.getString("token"))
}
@InternalApi
-private[connectors] final case class NoCredentials(projectId: String, token:
String) extends Credentials {
+private[auth] final case class AccessTokenCredentials(projectId: String,
accessToken: String) extends Credentials
+ with RetrievableCredentials {
- private val futureToken = Future.successful(OAuth2BearerToken(token))
+ private val futureToken = Future.successful(OAuth2BearerToken(accessToken))
override def get()(implicit ec: ExecutionContext, settings:
RequestSettings): Future[OAuth2BearerToken] =
futureToken
override def asGoogle(implicit ec: ExecutionContext, settings:
RequestSettings): GoogleCredentials =
new GoogleCredentials {
- override def getAuthenticationType: String = "<none>"
- override def getRequestMetadata(uri: URI): util.Map[String,
util.List[String]] = util.Collections.emptyMap()
- override def hasRequestMetadata: Boolean = false
+ override def getAuthenticationType: String = "OAuth2"
+ override def getRequestMetadata(uri: URI): util.Map[String,
util.List[String]] =
+ util.Collections.singletonMap("Authorization",
util.Collections.singletonList(accessToken))
+ override def hasRequestMetadata: Boolean = true
override def hasRequestMetadataOnly: Boolean = true
override def refresh(): Unit = ()
}
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
index 7d1ec3e24..61d169b2f 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
@@ -56,17 +56,26 @@ object Credentials {
creds
} catch {
case NonFatal(ex3) =>
- log.warning("Unable to find Application Default Credentials
for Google APIs")
- log.warning("Service account: {}", ex1.getMessage)
- log.warning("Compute Engine: {}", ex2.getMessage)
- log.warning("User access: {}", ex3.getMessage)
- parseNone(c) // TODO Once credentials are guaranteed to be
managed centrally we can throw an error instead
+ try {
+ val creds = parseAccessToken(c)
+ log.info("Using access token credentials")
+ creds
+ } catch {
+ case NonFatal(ex4) =>
+ log.warning("Unable to find Application Default
Credentials for Google APIs")
+ log.warning("Service account: {}", ex1.getMessage)
+ log.warning("Compute Engine: {}", ex2.getMessage)
+ log.warning("User access: {}", ex3.getMessage)
+ log.warning("Access token: {}", ex4.getMessage)
+ parseNone(c) // TODO Once credentials are guaranteed to
be managed centrally we can throw an error instead
+ }
}
}
}
case "service-account" => parseServiceAccount(c)
case "compute-engine" => parseComputeEngine(c)
case "user-access" => parseUserAccess(c)
+ case "access-token" => parseAccessToken(c)
case "none" => parseNone(c)
}
@@ -83,6 +92,8 @@ object Credentials {
private def parseUserAccess(c: Config)(implicit system:
ClassicActorSystemProvider) =
UserAccessCredentials(c.getConfig("user-access"))
+ private def parseAccessToken(c: Config) =
AccessTokenCredentials(c.getConfig("access-token"))
+
private def parseNone(c: Config) = NoCredentials(c.getConfig("none"))
private var _cache: Map[Any, Credentials] = ListMap.empty
@@ -96,6 +107,11 @@ object Credentials {
}
+@DoNotInherit
+private[google] trait RetrievableCredentials {
+ private[google] def get()(implicit ec: ExecutionContext, settings:
RequestSettings): Future[HttpCredentials]
+}
+
/**
* Credentials for accessing Google APIs
*/
@@ -104,8 +120,6 @@ abstract class Credentials private[auth] () {
private[google] def projectId: String
- private[google] def get()(implicit ec: ExecutionContext, settings:
RequestSettings): Future[HttpCredentials]
-
/**
* Wraps these credentials as a [[com.google.auth.Credentials]] for interop
with Google's Java client libraries.
* @param ec the [[scala.concurrent.ExecutionContext]] to use for blocking
requests if credentials are requested synchronously
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
index 6bff78f49..599d5f4f6 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
@@ -27,18 +27,12 @@ import scala.concurrent.{ ExecutionContext, Future }
@InternalApi
private[connectors] object NoCredentials {
- def apply(c: Config): NoCredentials =
NoCredentials(c.getString("project-id"), c.getString("token"))
+ def apply(c: Config): NoCredentials =
NoCredentials(c.getString("project-id"))
}
@InternalApi
-private[connectors] final case class NoCredentials(projectId: String, token:
String) extends Credentials {
-
- private val futureToken = Future.successful(OAuth2BearerToken(token))
-
- override def get()(implicit ec: ExecutionContext, settings:
RequestSettings): Future[OAuth2BearerToken] =
- futureToken
-
+private[connectors] final case class NoCredentials(projectId: String) extends
Credentials {
override def asGoogle(implicit ec: ExecutionContext, settings:
RequestSettings): GoogleCredentials =
new GoogleCredentials {
override def getAuthenticationType: String = "<none>"
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
index 15260873a..7ffaa385b 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
@@ -34,7 +34,8 @@ private[auth] object OAuth2Credentials {
}
@InternalApi
-private[auth] abstract class OAuth2Credentials(val projectId: String)(implicit
mat: Materializer) extends Credentials {
+private[auth] abstract class OAuth2Credentials(val projectId: String)(implicit
mat: Materializer) extends Credentials
+ with RetrievableCredentials {
private val tokenStream = stream.run()
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
index 1ad022d88..e7556ab59 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
@@ -23,6 +23,7 @@ import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal
}
import pekko.http.scaladsl.{ Http, HttpExt }
import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings,
RequestSettings, RetrySettings }
+import pekko.stream.connectors.google.auth.RetrievableCredentials
import pekko.stream.connectors.google.util.Retry
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }
@@ -166,10 +167,15 @@ private[connectors] final class GoogleHttp private (val
http: HttpExt) extends A
private def addAuth(request: HttpRequest)(implicit settings:
GoogleSettings): Future[HttpRequest] = {
implicit val requestSettings: RequestSettings = settings.requestSettings
- settings.credentials
- .get()
- .map { token =>
- request.addHeader(Authorization(token))
- }(ExecutionContexts.parasitic)
+ settings.credentials match {
+ case retrievable: RetrievableCredentials =>
+ retrievable
+ .get()
+ .map { token =>
+ request.addHeader(Authorization(token))
+ }(ExecutionContexts.parasitic)
+ case _ =>
+ Future.successful(request)
+ }
}
}
diff --git a/google-common/src/test/resources/application.conf
b/google-common/src/test/resources/application.conf
index d2eab0de4..0c6ab64d3 100644
--- a/google-common/src/test/resources/application.conf
+++ b/google-common/src/test/resources/application.conf
@@ -3,8 +3,8 @@
pekko.connectors.google {
credentials {
- provider = none
- none {
+ provider = access-token
+ access-token {
project-id = "pekko-connectors-google-test"
token = "yyyy.c.an-access-token"
}
diff --git
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
index cc37d94f2..fa0babf4b 100644
---
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
+++
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
@@ -25,13 +25,13 @@ import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import pekko.http.scaladsl.settings.ConnectionPoolSettings
import pekko.http.scaladsl.{ HttpExt, HttpsConnectionContext }
-import pekko.stream.connectors.google.auth.{ Credentials,
GoogleOAuth2Exception }
+import pekko.stream.connectors.google.auth.{ Credentials,
GoogleOAuth2Exception, RetrievableCredentials }
import pekko.stream.connectors.google.implicits._
import pekko.stream.connectors.google.{ GoogleHttpException, GoogleSettings,
RequestSettings }
import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
import pekko.testkit.TestKit
import org.mockito.ArgumentMatchers.{ any, anyInt, argThat }
-import org.mockito.Mockito.when
+import org.mockito.Mockito.{ when, withSettings }
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
@@ -160,7 +160,8 @@ class GoogleHttpSpec
final class AnotherException extends RuntimeException
- val credentials = mock[Credentials]
+ val credentials =
+ mock[Credentials with
RetrievableCredentials](withSettings().extraInterfaces(classOf[RetrievableCredentials]))
when(credentials.get()(any[ExecutionContext],
any[RequestSettings])).thenReturn(
Future.failed(GoogleOAuth2Exception(ErrorInfo())),
Future.failed(new AnotherException))
diff --git
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
index ca6c18820..8d4e010c6 100644
---
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
+++
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
@@ -83,8 +83,6 @@ class FcmSenderSpec
val request: HttpRequest = captor.getValue
Unmarshal(request.entity).to[FcmSend].futureValue shouldBe
FcmSend(false, FcmNotification.empty)
request.uri.toString should
startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send")
- request.headers.size shouldBe 1
- request.headers.head should matchPattern { case
HttpHeader("authorization", "Bearer <no-token>") => }
}
"parse the success response correctly" in {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]