This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 401da8b68 Rename none credentials to access-token and remove token 
from none (#1021)
401da8b68 is described below

commit 401da8b68fb02b246ee54e4d68f4a246950e6468
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Jul 13 07:12:48 2025 +0200

    Rename none credentials to access-token and remove token from none (#1021)
---
 .../src/main/resources/reference.conf              |  1 -
 .../bigquery/scaladsl/BigQueryQueriesSpec.scala    |  2 +-
 .../src/test/resources/application.conf            |  1 -
 .../google/AuthorizationHeaderSpec.scala           | 97 ++++++++++++++++++++++
 .../googlecloud/pubsub/PubSubMockSpec.scala        | 70 ++++++++++++++++
 .../googlecloud/pubsub/impl/PubSubApiSpec.scala    | 77 ++++-------------
 google-common/src/main/resources/reference.conf    | 10 ++-
 ...dentials.scala => AccessTokenCredentials.scala} | 18 ++--
 .../connectors/google/auth/Credentials.scala       | 28 +++++--
 .../connectors/google/auth/NoCredentials.scala     | 10 +--
 .../connectors/google/auth/OAuth2Credentials.scala |  3 +-
 .../stream/connectors/google/http/GoogleHttp.scala | 16 ++--
 google-common/src/test/resources/application.conf  |  4 +-
 .../connectors/google/http/GoogleHttpSpec.scala    |  7 +-
 .../firebase/fcm/v1/impl/FcmSenderSpec.scala       |  2 -
 15 files changed, 241 insertions(+), 105 deletions(-)

diff --git a/google-cloud-bigquery-storage/src/main/resources/reference.conf 
b/google-cloud-bigquery-storage/src/main/resources/reference.conf
index 9494f71ae..4210f5fc6 100644
--- a/google-cloud-bigquery-storage/src/main/resources/reference.conf
+++ b/google-cloud-bigquery-storage/src/main/resources/reference.conf
@@ -8,7 +8,6 @@ pekko.connectors.google {
     provider = none
     none {
       project-id = "pekko-connectors-google-test"
-      token = "yyyy.c.an-access-token"
     }
   }
 
diff --git 
a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
 
b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
index a02043b99..1aa488079 100644
--- 
a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
+++ 
b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala
@@ -50,7 +50,7 @@ class BigQueryQueriesSpec
     jsonFormat10(QueryResponse[T])
   }
 
-  implicit val settings: GoogleSettings = GoogleSettings().copy(credentials = 
NoCredentials("", ""))
+  implicit val settings: GoogleSettings = GoogleSettings().copy(credentials = 
NoCredentials(""))
 
   val jobId = "jobId"
   val pageToken = "pageToken"
diff --git a/google-cloud-pub-sub/src/test/resources/application.conf 
b/google-cloud-pub-sub/src/test/resources/application.conf
index 81cf6ad95..7cebb6a78 100644
--- a/google-cloud-pub-sub/src/test/resources/application.conf
+++ b/google-cloud-pub-sub/src/test/resources/application.conf
@@ -29,7 +29,6 @@ pekko.connectors.google {
     provider = none
     none {
       project-id = "pekko-connectors"
-      token = "TESTTOKEN"
     }
   }
 }
diff --git 
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
new file mode 100644
index 000000000..55b75b319
--- /dev/null
+++ 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/google/AuthorizationHeaderSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.google
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.headers.OAuth2BearerToken
+import pekko.stream.connectors.googlecloud.pubsub.impl.TestCredentials
+import pekko.stream.connectors.googlecloud.pubsub.{ PubSubMockSpec, 
PublishMessage, PublishRequest }
+import pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import pekko.stream.connectors.google.auth.{ Credentials, 
RetrievableCredentials }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock.{ aResponse, urlEqualTo 
}
+import com.typesafe.config.ConfigFactory
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{ when, withSettings }
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+import org.scalatestplus.mockito.MockitoSugar
+
+import java.util.Base64
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.collection.immutable.Seq
+import scala.concurrent.duration._
+
+class AuthorizationHeaderSpec extends AnyFlatSpec with BeforeAndAfterAll with 
ScalaFutures with Matchers
+    with LogCapturing
+    with PubSubMockSpec with MockitoSugar {
+
+  implicit val system: ActorSystem = ActorSystem(
+    "AuthorizationHeaderSpec",
+    ConfigFactory
+      .parseString(
+        s"pekko.connectors.google.credentials.none.project-id = 
${TestCredentials.projectId}")
+      .withFallback(ConfigFactory.load()))
+
+  implicit val defaultPatience: PatienceConfig =
+    PatienceConfig(timeout = 5.seconds, interval = 100.millis)
+
+  it should "publish with auth header" in {
+
+    val accessToken = "yyyy.c.an-access-token"
+    val credentials =
+      mock[Credentials with 
RetrievableCredentials](withSettings().extraInterfaces(classOf[RetrievableCredentials]))
+
+    when(credentials.get()(any[ExecutionContext], 
any[RequestSettings])).thenReturn(
+      Future.successful(OAuth2BearerToken(accessToken))
+    )
+
+    val publishMessage =
+      PublishMessage(
+        data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)),
+        attributes = Map("row_id" -> "7"))
+    val publishRequest = PublishRequest(Seq(publishMessage))
+
+    val expectedPublishRequest =
+      
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
+    val publishResponse = """{"messageIds":["1"]}"""
+
+    wireMock.register(
+      WireMock
+        .post(
+          
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
+        .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
+        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
+        .willReturn(
+          aResponse()
+            .withStatus(200)
+            .withBody(publishResponse)
+            .withHeader("Content-Type", "application/json")))
+    val flow = TestHttpApi.publish[Unit]("topic1", 1)
+    val result =
+      Source.single((publishRequest, ())).via(flow)
+        
.withAttributes(GoogleAttributes.settings(GoogleSettings().copy(credentials = 
credentials)))
+        .toMat(Sink.head)(Keep.right).run()
+    result.futureValue._1.messageIds shouldBe Seq("1")
+    result.futureValue._2 shouldBe (())
+  }
+}
diff --git 
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
new file mode 100644
index 000000000..65bee84a8
--- /dev/null
+++ 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/PubSubMockSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.googlecloud.pubsub
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.{ ConnectionContext, Http }
+import pekko.stream.connectors.googlecloud.pubsub.impl.{ NoopTrustManager, 
PubSubApi }
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier
+import 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+
+import javax.net.ssl.{ SSLContext, SSLEngine }
+import org.scalatest.{ BeforeAndAfterAll, Suite }
+
+trait PubSubMockSpec extends Suite with BeforeAndAfterAll {
+  implicit val system: ActorSystem
+
+  def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
+    val sslContext: SSLContext = SSLContext.getInstance("TLS")
+    sslContext.init(null, Array(new NoopTrustManager()), null)
+
+    val engine = sslContext.createSSLEngine(host, port)
+    engine.setUseClientMode(true)
+
+    engine
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    
Http().setDefaultClientHttpsContext(ConnectionContext.httpsClient(createInsecureSslEngine
 _))
+  }
+
+  val wiremockServer = new WireMockServer(
+    wireMockConfig().dynamicPort().dynamicHttpsPort().notifier(new 
ConsoleNotifier(false)))
+  wiremockServer.start()
+
+  val wireMock = new WireMock("localhost", wiremockServer.port())
+
+  object TestHttpApi extends PubSubApi {
+    val isEmulated = false
+    val PubSubGoogleApisHost = "localhost"
+    val PubSubGoogleApisPort = wiremockServer.httpsPort()
+  }
+
+  object TestEmulatorHttpApi extends PubSubApi {
+    override val isEmulated = true
+    val PubSubGoogleApisHost = "localhost"
+    val PubSubGoogleApisPort = wiremockServer.port()
+  }
+
+  val config = PubSubConfig()
+
+}
diff --git 
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
index f443719d1..c01634bc6 100644
--- 
a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
+++ 
b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala
@@ -16,15 +16,11 @@ package 
org.apache.pekko.stream.connectors.googlecloud.pubsub.impl
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorSystem
-import pekko.http.scaladsl.{ ConnectionContext, Http }
 import pekko.stream.connectors.googlecloud.pubsub._
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.scaladsl.{ Keep, Sink, Source }
-import com.github.tomakehurst.wiremock.WireMockServer
 import com.github.tomakehurst.wiremock.client.WireMock
 import com.github.tomakehurst.wiremock.client.WireMock.{ aResponse, urlEqualTo 
}
-import com.github.tomakehurst.wiremock.common.ConsoleNotifier
-import 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
 import com.typesafe.config.ConfigFactory
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.ScalaFutures
@@ -34,7 +30,7 @@ import org.scalatest.matchers.should.Matchers
 import java.security.cert.X509Certificate
 import java.time.Instant
 import java.util.Base64
-import javax.net.ssl.{ SSLContext, SSLEngine, X509TrustManager }
+import javax.net.ssl.X509TrustManager
 import scala.collection.immutable.Seq
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -51,7 +47,8 @@ class NoopTrustManager extends X509TrustManager {
   }
 }
 
-class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with 
ScalaFutures with Matchers with LogCapturing {
+class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with 
ScalaFutures with Matchers with LogCapturing
+    with PubSubMockSpec {
 
   implicit val system: ActorSystem = ActorSystem(
     "PubSubApiSpec",
@@ -63,40 +60,6 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
   implicit val defaultPatience: PatienceConfig =
     PatienceConfig(timeout = 5.seconds, interval = 100.millis)
 
-  def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
-    val sslContext: SSLContext = SSLContext.getInstance("TLS")
-    sslContext.init(null, Array(new NoopTrustManager()), null)
-
-    val engine = sslContext.createSSLEngine(host, port)
-    engine.setUseClientMode(true)
-
-    engine
-  }
-
-  
Http().setDefaultClientHttpsContext(ConnectionContext.httpsClient(createInsecureSslEngine
 _))
-
-  val wiremockServer = new WireMockServer(
-    wireMockConfig().dynamicPort().dynamicHttpsPort().notifier(new 
ConsoleNotifier(false)))
-  wiremockServer.start()
-
-  val mock = new WireMock("localhost", wiremockServer.port())
-
-  private object TestHttpApi extends PubSubApi {
-    val isEmulated = false
-    val PubSubGoogleApisHost = "localhost"
-    val PubSubGoogleApisPort = wiremockServer.httpsPort()
-  }
-
-  private object TestEmulatorHttpApi extends PubSubApi {
-    override val isEmulated = true
-    val PubSubGoogleApisHost = "localhost"
-    val PubSubGoogleApisPort = wiremockServer.port()
-  }
-
-  val config = PubSubConfig()
-
-  val accessToken = "TESTTOKEN"
-
   it should "publish" in {
 
     val publishMessage =
@@ -109,12 +72,11 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
       
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
     val publishResponse = """{"messageIds":["1"]}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(
           aResponse()
             .withStatus(200)
@@ -140,12 +102,11 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
       
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"},"orderingKey":"my-ordering-key"}]}"""
     val publishResponse = """{"messageIds":["1"]}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(
           aResponse()
             .withStatus(200)
@@ -175,12 +136,11 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
       """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
     val publishResponse = """{"messageIds":["1"]}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(
           aResponse()
             .withStatus(200)
@@ -203,7 +163,7 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
       """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
     val publishResponse = """{"messageIds":["1"]}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
@@ -236,13 +196,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
     val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(pullRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
 "application/json")))
 
     val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -266,13 +225,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
     val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(pullRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
 "application/json")))
 
     val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -295,7 +253,7 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
     val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
@@ -317,13 +275,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
     val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(pullRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         
.willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type",
 "application/json")))
 
     val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -339,13 +296,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
     val pullRequest = """{"returnImmediately":true,"maxMessages":1000}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(pullRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         
.willReturn(aResponse().withStatus(418).withBody(pullResponse).withHeader("Content-Type",
 "application/json")))
 
     val flow = TestHttpApi.pull("sub1", true, 1000)
@@ -358,13 +314,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
   it should "acknowledge" in {
     val ackRequest = """{"ackIds":["ack1"]}"""
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(ackRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(aResponse().withStatus(200)))
 
     val acknowledgeRequest = AcknowledgeRequest("ack1")
@@ -378,13 +333,12 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
 
   it should "fail acknowledge when result code is not success" in {
     val ackRequest = """{"ackIds":["ack1"]}"""
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           urlEqualTo(
             
s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(ackRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(aResponse().withStatus(401)))
 
     val acknowledgeRequest = AcknowledgeRequest("ack1")
@@ -406,12 +360,11 @@ class PubSubApiSpec extends AnyFlatSpec with 
BeforeAndAfterAll with ScalaFutures
     val expectedPublishRequest =
       
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
 
-    mock.register(
+    wireMock.register(
       WireMock
         .post(
           
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false"))
         .withRequestBody(WireMock.equalToJson(expectedPublishRequest))
-        .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
         .willReturn(
           aResponse()
             .withStatus(404)
diff --git a/google-common/src/main/resources/reference.conf 
b/google-common/src/main/resources/reference.conf
index 48a33ca28..63e79b552 100644
--- a/google-common/src/main/resources/reference.conf
+++ b/google-common/src/main/resources/reference.conf
@@ -8,8 +8,8 @@ pekko.connectors.google {
     # Override to specify custom scopes
     scopes = ${pekko.connectors.google.credentials.default-scopes}
 
-    # Options: application-default, service-account, compute-engine, none
-    # application-default first tries service-account then compute-engine
+    # Options: application-default, service-account, compute-engine, 
access-token, none
+    # application-default first tries service-account, compute-engine then 
access-token
     provider = application-default
 
     service-account {
@@ -48,9 +48,13 @@ pekko.connectors.google {
       path = ${?GOOGLE_APPLICATION_CREDENTIALS}
     }
 
+    access-token {
+      project-id = ""
+      token = ""
+    }
+
     none {
       project-id = "<no-project-id>"
-      token = "<no-token>"
     }
   }
 
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
similarity index 69%
copy from 
google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
copy to 
google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
index 6bff78f49..7e15eb3f0 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/AccessTokenCredentials.scala
@@ -25,25 +25,25 @@ import java.util
 import scala.concurrent.{ ExecutionContext, Future }
 
 @InternalApi
-private[connectors] object NoCredentials {
-
-  def apply(c: Config): NoCredentials = 
NoCredentials(c.getString("project-id"), c.getString("token"))
-
+private[auth] object AccessTokenCredentials {
+  def apply(c: Config): AccessTokenCredentials = 
AccessTokenCredentials(c.getString("project-id"), c.getString("token"))
 }
 
 @InternalApi
-private[connectors] final case class NoCredentials(projectId: String, token: 
String) extends Credentials {
+private[auth] final case class AccessTokenCredentials(projectId: String, 
accessToken: String) extends Credentials
+    with RetrievableCredentials {
 
-  private val futureToken = Future.successful(OAuth2BearerToken(token))
+  private val futureToken = Future.successful(OAuth2BearerToken(accessToken))
 
   override def get()(implicit ec: ExecutionContext, settings: 
RequestSettings): Future[OAuth2BearerToken] =
     futureToken
 
   override def asGoogle(implicit ec: ExecutionContext, settings: 
RequestSettings): GoogleCredentials =
     new GoogleCredentials {
-      override def getAuthenticationType: String = "<none>"
-      override def getRequestMetadata(uri: URI): util.Map[String, 
util.List[String]] = util.Collections.emptyMap()
-      override def hasRequestMetadata: Boolean = false
+      override def getAuthenticationType: String = "OAuth2"
+      override def getRequestMetadata(uri: URI): util.Map[String, 
util.List[String]] =
+        util.Collections.singletonMap("Authorization", 
util.Collections.singletonList(accessToken))
+      override def hasRequestMetadata: Boolean = true
       override def hasRequestMetadataOnly: Boolean = true
       override def refresh(): Unit = ()
     }
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
index 7d1ec3e24..61d169b2f 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
@@ -56,17 +56,26 @@ object Credentials {
                 creds
               } catch {
                 case NonFatal(ex3) =>
-                  log.warning("Unable to find Application Default Credentials 
for Google APIs")
-                  log.warning("Service account: {}", ex1.getMessage)
-                  log.warning("Compute Engine: {}", ex2.getMessage)
-                  log.warning("User access: {}", ex3.getMessage)
-                  parseNone(c) // TODO Once credentials are guaranteed to be 
managed centrally we can throw an error instead
+                  try {
+                    val creds = parseAccessToken(c)
+                    log.info("Using access token credentials")
+                    creds
+                  } catch {
+                    case NonFatal(ex4) =>
+                      log.warning("Unable to find Application Default 
Credentials for Google APIs")
+                      log.warning("Service account: {}", ex1.getMessage)
+                      log.warning("Compute Engine: {}", ex2.getMessage)
+                      log.warning("User access: {}", ex3.getMessage)
+                      log.warning("Access token: {}", ex4.getMessage)
+                      parseNone(c) // TODO Once credentials are guaranteed to 
be managed centrally we can throw an error instead
+                  }
               }
           }
       }
     case "service-account" => parseServiceAccount(c)
     case "compute-engine"  => parseComputeEngine(c)
     case "user-access"     => parseUserAccess(c)
+    case "access-token"    => parseAccessToken(c)
     case "none"            => parseNone(c)
   }
 
@@ -83,6 +92,8 @@ object Credentials {
   private def parseUserAccess(c: Config)(implicit system: 
ClassicActorSystemProvider) =
     UserAccessCredentials(c.getConfig("user-access"))
 
+  private def parseAccessToken(c: Config) = 
AccessTokenCredentials(c.getConfig("access-token"))
+
   private def parseNone(c: Config) = NoCredentials(c.getConfig("none"))
 
   private var _cache: Map[Any, Credentials] = ListMap.empty
@@ -96,6 +107,11 @@ object Credentials {
 
 }
 
+@DoNotInherit
+private[google] trait RetrievableCredentials {
+  private[google] def get()(implicit ec: ExecutionContext, settings: 
RequestSettings): Future[HttpCredentials]
+}
+
 /**
  * Credentials for accessing Google APIs
  */
@@ -104,8 +120,6 @@ abstract class Credentials private[auth] () {
 
   private[google] def projectId: String
 
-  private[google] def get()(implicit ec: ExecutionContext, settings: 
RequestSettings): Future[HttpCredentials]
-
   /**
    * Wraps these credentials as a [[com.google.auth.Credentials]] for interop 
with Google's Java client libraries.
    * @param ec the [[scala.concurrent.ExecutionContext]] to use for blocking 
requests if credentials are requested synchronously
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
index 6bff78f49..599d5f4f6 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala
@@ -27,18 +27,12 @@ import scala.concurrent.{ ExecutionContext, Future }
 @InternalApi
 private[connectors] object NoCredentials {
 
-  def apply(c: Config): NoCredentials = 
NoCredentials(c.getString("project-id"), c.getString("token"))
+  def apply(c: Config): NoCredentials = 
NoCredentials(c.getString("project-id"))
 
 }
 
 @InternalApi
-private[connectors] final case class NoCredentials(projectId: String, token: 
String) extends Credentials {
-
-  private val futureToken = Future.successful(OAuth2BearerToken(token))
-
-  override def get()(implicit ec: ExecutionContext, settings: 
RequestSettings): Future[OAuth2BearerToken] =
-    futureToken
-
+private[connectors] final case class NoCredentials(projectId: String) extends 
Credentials {
   override def asGoogle(implicit ec: ExecutionContext, settings: 
RequestSettings): GoogleCredentials =
     new GoogleCredentials {
       override def getAuthenticationType: String = "<none>"
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
index 15260873a..7ffaa385b 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2Credentials.scala
@@ -34,7 +34,8 @@ private[auth] object OAuth2Credentials {
 }
 
 @InternalApi
-private[auth] abstract class OAuth2Credentials(val projectId: String)(implicit 
mat: Materializer) extends Credentials {
+private[auth] abstract class OAuth2Credentials(val projectId: String)(implicit 
mat: Materializer) extends Credentials
+    with RetrievableCredentials {
 
   private val tokenStream = stream.run()
 
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
index 1ad022d88..e7556ab59 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala
@@ -23,6 +23,7 @@ import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
 import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal 
}
 import pekko.http.scaladsl.{ Http, HttpExt }
 import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, 
RequestSettings, RetrySettings }
+import pekko.stream.connectors.google.auth.RetrievableCredentials
 import pekko.stream.connectors.google.util.Retry
 import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }
 
@@ -166,10 +167,15 @@ private[connectors] final class GoogleHttp private (val 
http: HttpExt) extends A
 
   private def addAuth(request: HttpRequest)(implicit settings: 
GoogleSettings): Future[HttpRequest] = {
     implicit val requestSettings: RequestSettings = settings.requestSettings
-    settings.credentials
-      .get()
-      .map { token =>
-        request.addHeader(Authorization(token))
-      }(ExecutionContexts.parasitic)
+    settings.credentials match {
+      case retrievable: RetrievableCredentials =>
+        retrievable
+          .get()
+          .map { token =>
+            request.addHeader(Authorization(token))
+          }(ExecutionContexts.parasitic)
+      case _ =>
+        Future.successful(request)
+    }
   }
 }
diff --git a/google-common/src/test/resources/application.conf 
b/google-common/src/test/resources/application.conf
index d2eab0de4..0c6ab64d3 100644
--- a/google-common/src/test/resources/application.conf
+++ b/google-common/src/test/resources/application.conf
@@ -3,8 +3,8 @@
 pekko.connectors.google {
 
   credentials {
-    provider = none
-    none {
+    provider = access-token
+    access-token {
       project-id = "pekko-connectors-google-test"
       token = "yyyy.c.an-access-token"
     }
diff --git 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
index cc37d94f2..fa0babf4b 100644
--- 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
+++ 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala
@@ -25,13 +25,13 @@ import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
 import pekko.http.scaladsl.settings.ConnectionPoolSettings
 import pekko.http.scaladsl.{ HttpExt, HttpsConnectionContext }
-import pekko.stream.connectors.google.auth.{ Credentials, 
GoogleOAuth2Exception }
+import pekko.stream.connectors.google.auth.{ Credentials, 
GoogleOAuth2Exception, RetrievableCredentials }
 import pekko.stream.connectors.google.implicits._
 import pekko.stream.connectors.google.{ GoogleHttpException, GoogleSettings, 
RequestSettings }
 import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
 import pekko.testkit.TestKit
 import org.mockito.ArgumentMatchers.{ any, anyInt, argThat }
-import org.mockito.Mockito.when
+import org.mockito.Mockito.{ when, withSettings }
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
@@ -160,7 +160,8 @@ class GoogleHttpSpec
 
       final class AnotherException extends RuntimeException
 
-      val credentials = mock[Credentials]
+      val credentials =
+        mock[Credentials with 
RetrievableCredentials](withSettings().extraInterfaces(classOf[RetrievableCredentials]))
       when(credentials.get()(any[ExecutionContext], 
any[RequestSettings])).thenReturn(
         Future.failed(GoogleOAuth2Exception(ErrorInfo())),
         Future.failed(new AnotherException))
diff --git 
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
 
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
index ca6c18820..8d4e010c6 100644
--- 
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
+++ 
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
@@ -83,8 +83,6 @@ class FcmSenderSpec
       val request: HttpRequest = captor.getValue
       Unmarshal(request.entity).to[FcmSend].futureValue shouldBe 
FcmSend(false, FcmNotification.empty)
       request.uri.toString should 
startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send";)
-      request.headers.size shouldBe 1
-      request.headers.head should matchPattern { case 
HttpHeader("authorization", "Bearer <no-token>") => }
     }
 
     "parse the success response correctly" in {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to