David, Patrick-

Following up, here's a working sample of the concepts mentioned previously.
The self-contained, runnable Gist is here: 
https://gist.github.com/lancearlaus/e6e52fc8c7ca534cb026#file-akka-user-stream-oauth2-scala

The flow materializes an (Agent, Cancellable) tuple. The Agent contains a 
Future that can be used to get the currently valid Access Token suitable 
for making outbound requests for protected resources. The auto refresh 
process can be cancelled using the materialized Cancellable. If not, it'll 
continue to chug away requesting new access tokens periodically.

The flow itself is, of course, designed to handle automatic refresh, but 
also automatic expiration. Should a new Access Token not arrive by the time 
the current token expires, a future to the next token will be set on the 
Agent. This should force dependent flows to naturally pause until the new, 
valid token arrives, thus avoiding using old, expired tokens.

I would recommend placing any retry and timeout logic outside the auto 
refresh flow. For example, retrying requests to the token refresh endpoint 
should go in the request flow that is used by the auto refresh flow. 
Similarly, any timeout logic for waiting on the access token future should 
go in the outbound request flow(s).

Note that I added a persist sink to the auto refresh flow. This sink 
receives the updated Refresh Token and should persist it somewhere since 
you'll need the last Refresh Token to initiate the auto refresh flow again 
once it's been stopped.

Assuming this is part of a larger system, you'll likely want to store the 
materialized pair in some map, or similar, data structure for retrieval and 
use in flows and for cancelling the auto refresh when no longer needed.

I also included a sample of how to use the Agent in creating a flow that 
decorates outbound requests with the current Access Token.

Thanks again for raising a nice, meaty issue that leverages a variety of 
concepts.
I'll likely use this flow as the basis for my next blog post.

Hope it helps. Feel free to sling more questions.

Lance

// A simple implementation of the Akka Cancellable trait
class AtomicCancellable extends Cancellable {
  private val cancelled = new AtomicBoolean()
  override def cancel() = cancelled.compareAndSet(false, true)
  override def isCancelled() = cancelled.get
}
object AtomicCancellable {
  def apply() = new AtomicCancellable
}

// Sample OAuth refresh token/response related classes
abstract class Token(prefix: String, sequence: Long) {
  val code: String = prefix + sequence
}
case class RefreshToken(sequence: Long) extends Token("R", sequence) {
  def this() = this(0)
  def next = this.copy(sequence = sequence + 1)
}
case class AccessToken private (sequence: Long, expiresIn: FiniteDuration) 
extends Token("A", sequence) {
  def this(token: RefreshToken, expiresIn: FiniteDuration) = 
this(token.sequence, expiresIn)
}

case class RefreshResponse(access: AccessToken, refresh: 
Option[RefreshToken])


// Creates a sink that materializes an Agent for obtaining the current 
Access Token
// The sink accepts a single element, the initial Refresh Token, and
// initiates an internal flow that automatically requests new Access Tokens
// and updates the materialized Agent accordingly
// The paired Cancellable can be used to stop the automatic update process
// Callers should provide a valid persist sink, typically backed by a 
database
// or other persistent store, to save the latest refresh token value
// The lead argument is the lead time to request the next Access Token 
prior to
// the current token's expiration
def autoRefresh(
    request: Flow[RefreshToken, Future[RefreshResponse], _], 
    persist: Sink[RefreshToken, _],
    lead: FiniteDuration = 30.seconds
  )(implicit materializer: Materializer, system: ActorSystem) : 
Sink[RefreshToken, (Agent[Future[AccessToken]], Cancellable)] = {

  Sink.head[RefreshToken].mapMaterializedValue { futureInitial =>

    implicit val executionContext = materializer.executionContext
    val first = Promise[AccessToken]
    val agent = Agent(first.future)
    val cancellable = AtomicCancellable()

    // Create the auto refresh flow that will run independently
    // to periodically update the agent with the current Access Token
    val auto = Sink(
      Flow[(RefreshToken, Promise[AccessToken])],
      Merge[(RefreshToken, Promise[AccessToken])](2), 
      request, 
      Unzip[RefreshToken, Promise[AccessToken]],
      Zip[Future[RefreshResponse], Promise[AccessToken]],
      Broadcast[(RefreshResponse, Promise[AccessToken])](3)
    )((mat, _, _, _, _, _) => mat) {
      implicit b => (initial, merge, request, unzip, zip, bcast) =>

      // Detect and handle cancellation
      // The splitWhen diverts the flow of elements upon cancellation,
      // ending the refresh process and allowing us to handle cleanup
      // so we don't leave a dangling, uncompleted promise
      val cancel = b.add(Flow[(RefreshToken, Promise[AccessToken])]
        .splitWhen(_ => cancellable.isCancelled)
        .prefixAndTail(1).map { 
          case (prefix, tail) => {
            tail.map(_.map {
              case (_, promise) => promise.failure(new Exception("auto 
refresh cancelled"))
            })
            prefix.head
          }
        }.flatten(FlattenStrategy.concat))

      // Complete current promise and create next promise
      val promise = b.add(Flow[(Future[RefreshResponse], 
Promise[AccessToken])]
        .map {
          case (fresponse, cur) => {
            cur.completeWith(fresponse.map(_.access))

            val next = Promise[AccessToken]            
            // Update agent upon promise completion
            // Note that this a side effect, hence the andThen
            next.future.andThen { case _ => agent.send(next.future) }

            (fresponse, next)
          }
        })

      // Unwrap the completed response future
      val response = b.add(Flow[(Future[RefreshResponse], 
Promise[AccessToken])]
        .map { case (fr, p) => Source(fr.map(r => (r, p))) }
        .flatten(FlattenStrategy.concat))

      // Save the updated refresh token, if supplied
      val save = b.add(Flow[(RefreshResponse, Promise[AccessToken])]
        .collect { case (RefreshResponse(_, Some(refresh)), _) => refresh }
        .to(persist))

      // Send the next future to the agent upon expiration of current to
      // prevent users of the agent from using expired tokens
      // Uses the Akka after pattern to schedule the send
      val expiration = b.add(Sink.foreach[(RefreshResponse, 
Promise[AccessToken])] {
        case (RefreshResponse(access, _), promise) => {
          after(access.expiresIn, system.scheduler)(Future {
            if (!promise.isCompleted) {
              agent.send(promise.future)
            }
          })
        }
      })

      // Feed back the refresh token after delay to initiate the next 
access token refresh
      // Uses the Akka after pattern to schedule the Future with configured 
lead time
      // prior to expiration
      val refresh = b.add(Flow[(RefreshResponse, Promise[AccessToken])]
        .collect { 
          case (RefreshResponse(access, Some(refresh)), promise) => {
            val delay = access.expiresIn.minus(lead)
            val future = after(delay, 
system.scheduler)(Future.successful((refresh, promise)))
            Source(future)
          }
        }.flatten(FlattenStrategy.concat))

      // The rolling request flow, initiated with the initial refresh token
      initial ~> merge ~> cancel ~> unzip.in
                                    unzip.out0 ~> request ~> zip.in0 
                                    unzip.out1            ~> zip.in1
                                                             zip.out ~> 
promise ~> response ~> bcast ~> save
                                                                            
                   bcast ~> expiration
                                                                            
                   bcast ~> refresh
                 merge                                                     
                          <~ refresh

      initial.inlet

    }

    // Run the auto refresh flow
    val initial = futureInitial.map(refresh => (refresh, first))
    auto.runWith(Source(initial))

    // Return the (Agent, Cancellable) pair
    (agent, cancellable)
  
  }

}

// Create an Access Token source that always retrieves the latest value 
from an agent
def accessTokenSource(agent: Agent[Future[AccessToken]]): 
Source[AccessToken, Unit] = 
  Source.repeat().map(_ => Source(agent())).flatten(FlattenStrategy.concat)

// Flow that decorates requests with the current access token
def addAccessToken(tokens: Source[AccessToken, Unit]): Flow[HttpRequest, 
HttpRequest, Unit] =
  Flow(Flow[HttpRequest], tokens, Zip[HttpRequest, AccessToken])((mat, _, 
_) => mat) {
    implicit b => (requests, tokens, zip) =>

    val addHeader = b.add(Flow[(HttpRequest, AccessToken)].map {
      case (request, access) => request.withHeaders(new Authorization(new 
OAuth2BearerToken(access.code)))
    })

    requests ~> zip.in0
    tokens   ~> zip.in1
                zip.out ~> addHeader

    (requests.inlet, addHeader.outlet)
  }


class OAuthRefreshSpec extends FlatSpec with AkkaStreamsImplicits with 
Matchers with ScalaFutures {
 
  def createMockResponse(refresh: RefreshToken, expiresIn: FiniteDuration) 
= 
    RefreshResponse(new AccessToken(refresh, expiresIn), Some(refresh.next))

  "Auto refresh" should "generate access token" in {

    val initialRefresh = new RefreshToken()
    val expiresIn = 3.seconds
    val mockRequestFlow = Flow[RefreshToken].map(r => 
Future.successful(createMockResponse(r, expiresIn)))
    val persist = Sink.foreach[RefreshToken](r => println(s"Saving refresh 
token: $r"))

    val autoRefreshSink = autoRefresh(mockRequestFlow, persist, 1.second)
    val (agent, cancellable) = 
Source.single(initialRefresh).runWith(autoRefreshSink)

    whenReady(agent.get) { token =>
      val expected = new AccessToken(initialRefresh, expiresIn)
      token shouldBe expected
      cancellable.cancel()
    }

  }

}



On Friday, August 7, 2015 at 3:03:36 PM UTC-4, Lance Arlaus wrote:
>
> Hi Patrick-
>
> Nice to meet you as well.
>
> Can you help me understand what would start with an expired token? If 
> possible, this would seem to be a design flaw in the token publisher, not 
> the subscriber. The Access Token publisher should *always* be publishing 
> valid tokens, if at all possible. There should only be two ways that 
> doesn't happen - either the refresh endpoint is supplying bad tokens that 
> expire prematurely (shame on them), or we're using a token past its stated 
> lifetime (shame on us).
>
> To avoid using tokens past their stated lifetime, which is the case I 
> believe you're referring to, I would lean toward improving the design of 
> the refresh flow, not forcing the token subscribers to compensate. Nobody 
> likes a component that does half the job :)
> Here's a couple of possibilities:
>
>    - For the original design that was using pure streams to produce the 
>    tokens, adding another takeWhile transformation to the internal flow 
> should 
>    do the trick. The condition on the takeWhile will return true until token 
>    expiration, thus ensuring that expired tokens are never published. Note 
>    that there will be two takeWhile conditionals - one for expiration and one 
>    for retiring the current token once the next has arrived.
>    - For the Agent-based design, something similar can be done, assuming 
>    the Agent holds a Future and *not* the raw Access Token (see my previous 
>    post). The following simple logic, which can be implemented via streams, 
> if 
>    desired, should do the trick.
>       - Given: The Agent holds a currently valid Access Token via a 
>       Future that is near expiration
>       - Create a new Future[AccessToken] that will complete with the new 
>       value for the AccessToken. The future on a Promise is sometimes handy 
> for 
>       this.
>       - Upon completion, have the completion logic set the new Future on 
>       the Agent. So, when the new token comes in, clients of the agent will 
> see 
>       it.
>       - Upon expiration, set the new Future on the Agent if it hasn't 
>       completed. This ensures that expired tokens never get used. Clients of 
> the 
>       Agent will wait for the Future to complete (for a new token to arrive). 
> Of 
>       course, if the future has already completed, no need to set it - the 
>       completion logic will handle it.
>       - Note that the two steps above accomplish the same as the two 
>       takeWhile conditionals above
>       - Also note that this assumes you're retrieving the latest value 
>       from the Agent for each outbound request (not saving Access Tokens). If 
>       using streams, this means having a source backed by the current future 
>       contained in the Agent.
>       - You may want to set an appropriate timeout duration for clients 
>       of the Future[AccessToken]. A simple way to do this if you're using 
> streams 
>       would be Source(future).takeWithin(duration) to avoid stalling forever 
>       waiting for a token to arrive.
>    
> Hope that helps.
>
> Lance
>
> On Friday, August 7, 2015 at 1:00:51 PM UTC-4, Patrick Li wrote:
>>
>> Hi Lance,
>>
>> Nice to meet you, I am working with David on this problem, so let me jump 
>> in while he is sleeping :)
>>
>> I think one scenario of having an expired token (even with the constant 
>> refresh happening in the background), is if the system goes down for a 
>> period of time (either planned or otherwise) long enough for the current 
>> access token to expire, then when the system is up again, it will start 
>> with an expired token.
>>
>> Patrick
>>
>> On Friday, August 7, 2015 at 7:28:49 AM UTC-7, Lance Arlaus wrote:
>>>
>>> David-
>>>
>>> Excellent call on using an agent. It's a nice fit for this situation. My 
>>> only hesitation would be ensuring that the agent is properly initialized 
>>> with a valid token before you start slinging requests. The flow I 
>>> originally posted ensures that by way of the Access Token source, but it 
>>> can also be easily accomplished by having the agent hold a Future instead 
>>> of the token itself (and using that Future as the basis for a Source that 
>>> feeds into your request flow). More on this in a sec...
>>>
>>> As for using an independent Actor, that'll certainly work, though it's 
>>> not strictly necessary if you want to stay in Streams land. You can put a 
>>> Sink on the refresh flow that updates the agent. More readable and 
>>> maintainable, IMHO, but there may be other considerations. Of course, if 
>>> you go the flow route, you'll need some way to stop the refresh flow, if 
>>> need be, since it'll run ad infinitum. Shouldn't be hard.
>>>
>>> As for buffering considerations, you're right to highlight it but I 
>>> wouldn't worry too much. Buffering is strictly for performance and easy to 
>>> turn off. The refresh flow should have no buffering on the flow to ensure a 
>>> single outstanding request at any given time i.e. 
>>> flow.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
>>>
>>> Finally, on the expired tokens, can you please describe under what 
>>> specific circumstances you foresee where streams would "sometimes fail"? 
>>> Given that tokens will be refreshed, and a new token made available prior 
>>> to expiration, how would the expired token scenario manifest? Forgive my 
>>> insistence on this point, but failure modes tend to increase, not diminish, 
>>> when responsibility diffuses across multiple components, especially in 
>>> asynchronous systems.
>>>
>>> Coming back to distribution of the Access Token via an Agent holding a 
>>> Future, this may be a good option for accomplishing two design objectives. 
>>> First, eliminating initialization timing considerations, as described 
>>> above. Second, handling retry delays in the refresh flow. Any delay in 
>>> obtaining a new token will naturally pause consumers without the need for 
>>> further coordination. Of course, consumers can dial in their tolerance for 
>>> delay by selecting the appropriate timeout (using takeWithin, for example) 
>>> and interpret a failed Access Token Future accordingly.
>>>
>>> Thanks for the interesting design problem you've raised. I'll try to 
>>> break some of this down into code if I get the chance.
>>>
>>> Lance
>>>
>>> On Thursday, August 6, 2015 at 10:17:11 PM UTC-4, David Pinn wrote:
>>>>
>>>> Wow, Lance! You have blown my mind. You are so right about treating a 
>>>> token refresh as a part of normal program flow, and not an error 
>>>> condition. 
>>>> I'm spending today getting this right.
>>>>
>>>> I have a couple of thoughts on implementation. I'm wondering if Akka 
>>>> Agents can help us out here. They're designed to hold a value, potentially 
>>>> shared between multiple components, the value of which is accessed 
>>>> synchronously, and altered asynchronously and atomically. Sounds perfect 
>>>> for holding the OAuth tokens. Rather than weave a source of access tokens 
>>>> into each stream that I set up, perhaps I can give each stream a reference 
>>>> to an Agent<OAuth2Info>. Whenever the stream executes its HTTP GET request 
>>>> against the source system, it grabs the latest value from the agent. An 
>>>> independent AuthorizationManager actor could refresh the OAuthInfo in each 
>>>> of those agents at the appropriate time. I'm thinking that this might be 
>>>> simpler, and would avoid any potential complications caused by buffering 
>>>> within the stream.
>>>>
>>>> It seems to me that streams will nevertheless sometimes fail because of 
>>>> expired access tokens. Maybe the system time gets out of sync in such a 
>>>> way 
>>>> that the AuthorizationManager is late with its refresh. Or maybe the 
>>>> system 
>>>> gets shut down and re-started, and the refresh operations haven't caught 
>>>> up 
>>>> yet. Whatever. Maybe the streams of data from external systems should, in 
>>>> the case that they get a 401 UNAUTHORIZED from the source, emit an event 
>>>> on 
>>>> the global Akka event bus, and the AuthorizationManager should listen for 
>>>> those events and fire off a refresh. The data streams should just wait 30 
>>>> seconds before retrying, and only stop() if that second attempt fails.
>>>>
>>>> Thank you, thank you for helping me work through this problem.
>>>>
>>>> David
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 7 August 2015 at 01:12, Lance Arlaus <lance....@gmail.com> wrote:
>>>>
>>>>> David-
>>>>>
>>>>> What if we pivot the problem slightly? Instead of framing token 
>>>>> refresh as a reaction to failure, what if we consider it a normal, 
>>>>> adjacent 
>>>>> flow?
>>>>>
>>>>> In other words, assume there's a source that provides a continuous 
>>>>> stream of Access Tokens. Within the expiration window of a given Access 
>>>>> Token, the source will always return the same token, as many times as 
>>>>> needed. Once the Access Token expires, or shortly before, a refresh 
>>>>> request 
>>>>> will be initiated and the newly minted Access Token seamlessly emitted 
>>>>> from 
>>>>> the source.
>>>>>
>>>>> Accessing a protected resource would then be a simple matter of adding 
>>>>> the current Access Token to any outgoing request. The two concerns, 
>>>>> generating Access Tokens and retrieving protected resources, are then 
>>>>> duly 
>>>>> separated. Failure to retrieve an Access Token should halt request 
>>>>> processing, but failure to retrieve a protected resource should never 
>>>>> trigger a token refresh (you can assume you have a valid token).
>>>>>
>>>>> Following is some semi-code that demonstrates what I'm talking about. 
>>>>> Note that the function signatures are missing an implicit here and 
>>>>> there. I'll try to get this into the form of a working Gist if I can get 
>>>>> the time, but I wanted to get this out there to give you the idea.
>>>>>
>>>>> Side note, I would reserve supervision for genuine error conditions, 
>>>>> not as a means of executing a standard flow. Doing otherwise is akin to 
>>>>> using exceptions in Java land to govern normal business flow.
>>>>>
>>>>> Let me know if you have questions.
>>>>>
>>>>> Lance
>>>>>
>>>>> abstract class Token(prefix: String, sequence: Long) {
>>>>>   def code: String = prefix + sequence
>>>>> }
>>>>> case class RefreshToken(sequence: Long) extends Token("R", sequence) {
>>>>>   def next = this.copy(sequence = sequence + 1)
>>>>> }
>>>>> case class AccessToken private (sequence: Long, expiresIn: Duration) 
>>>>> extends Token("A", sequence) {
>>>>>   def this(token: RefreshToken, expiresIn: Duration) = 
>>>>> this(token.sequence, expiresIn)
>>>>> }
>>>>>
>>>>> case class RefreshResponse(access: AccessToken, refresh: 
>>>>> Option[RefreshToken])
>>>>>
>>>>>
>>>>> // The basic OAuth refresh token request/response flow (section 6 of 
>>>>> the OAuth 2.0 spec)
>>>>> // This flow is the fundamental source of access and refresh tokens
>>>>> // and would be a real HTTP flow in a working example
>>>>> def refreshRequest(endpoint: URL, clientId: String, clientSecret: 
>>>>> String): Flow[RefreshToken, Future[RefreshResponse], Unit] = {
>>>>>   // val expiresIn = 10.minutes
>>>>>   // This would be the real flow that makes the request to the OAuth 
>>>>> refresh endpoint
>>>>>   // using Http().singleRequest(...), for example
>>>>>   Flow[RefreshToken].map(refresh => 
>>>>> Future.successful(RefreshResponse(new AccessToken(refresh, 10.minutes), 
>>>>> Some(refresh.next))))
>>>>> }
>>>>>
>>>>> // Executes a rolling series of refresh requests with appropriate 
>>>>> delay to serve as the
>>>>> // basis for a continuous stream of Access Tokens
>>>>> // The pair of futures emitted represent the current and next Access 
>>>>> Tokens, respectively
>>>>> // The current Access Token should be used until the future for the 
>>>>> next Access Token completes
>>>>> def refreshFlow(initial: RefreshToken, request: Flow[RefreshToken, 
>>>>> Future[RefreshResponse], Unit]): Source[(Future[AccessToken], 
>>>>> Future[AccessToken])] = {
>>>>>   Source(
>>>>>     Source(initial, Promise[AccessToken]), 
>>>>>     request, 
>>>>>     Merge[(RefreshToken, Promise[AccessToken])](2), 
>>>>>     Unzip[RefreshToken, Promise[AccessToken]],
>>>>>     Zip[RefreshResponse, Promise[AccessToken]],
>>>>>     Broadcast(2))((mat, _, _, _, _, _) => mat)
>>>>>   {
>>>>>     implicit b => (initial, request, merge, unzip, zip, bcast) =>
>>>>>
>>>>>     // Complete current promise and create next promise
>>>>>     val promise = b.add(Flow[(Future[RefreshResponse], 
>>>>> Promise[AccessToken])].map {
>>>>>       case (response, promise) => {
>>>>>         promise.completeWith(response.map(_.access))
>>>>>         (response, Promise[AccessToken])
>>>>>       }
>>>>>     })
>>>>>
>>>>>     // Feeds back the refresh token after delay to initiate the next 
>>>>> access token refresh
>>>>>     // Uses the Akka after pattern to schedule the Future about 30 
>>>>> seconds prior to expiration
>>>>>     val feedback = b.add(Flow[(Future[RefreshResponse], 
>>>>> Promise[AccessToken])].map {
>>>>>       case (response, promise) => {
>>>>>         Source(response).collect { 
>>>>>           case RefreshResponse(access, Some(refresh)) => {
>>>>>             val delay = access.expiresIn.minus(30.seconds)
>>>>>             val future = after(delay, scheduler)(() => 
>>>>> Future.successful(refresh, promise))
>>>>>             Source(future)
>>>>>           }
>>>>>         }.flatten(FlattenStrategy.concat)
>>>>>       }
>>>>>     })
>>>>>
>>>>>     // Output the current/next token future pair
>>>>>     val output = b.add(Flow[(Future[RefreshResponse], 
>>>>> Promise[AccessToken])].map {
>>>>>       case (response, promise) => (response.map(_.access), 
>>>>> promise.future)
>>>>>     })
>>>>>
>>>>>     // The rolling request flow, initiated with the initial refresh 
>>>>> token
>>>>>     initial ~> merge ~> unzip.in
>>>>>                         unzip.left ~> request ~> zip.in0 
>>>>>                         unzip.right           ~> zip.in1
>>>>>                                                  zip.out ~> promise ~> 
>>>>> bcast ~> feedback
>>>>>                merge                                                   
>>>>>       <~ feedback
>>>>>                                                                       
>>>>>  bcast ~> output
>>>>>
>>>>>     output.outlet
>>>>>   }
>>>>> }
>>>>>
>>>>> // Converts a source of (current, next) Access Token future pairs into
>>>>> // a continuous stream of Access Tokens, switching to the next token, 
>>>>> when available
>>>>> def accessTokenSource(source: Source[(Future[AccessToken], 
>>>>> Future[AccessToken])]): Source[AccessToken, Unit] = {
>>>>>   source.map {
>>>>>     case (current, next) => Source(current).map(token => 
>>>>> Source.repeat(token).takeWhile(!next.isCompleted))
>>>>>   }.flatten(FlattenStrategy.concat)
>>>>> }
>>>>>
>>>>>
>>>>> // Decorate requests with the current access token
>>>>> def protected(tokens: Source[AccessToken]): Flow[HttpRequest, 
>>>>> HttpRequest] = {
>>>>>   Flow[HttpRequest].map(_.withHeaders(new 
>>>>> OAuth2BearerToken(AccessToken.code)))
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wednesday, August 5, 2015 at 9:46:05 AM UTC-4, David Pinn wrote:
>>>>>>
>>>>>> I have an application that pulls data from an external system using 
>>>>>> HTTP GET requests, the headers of which include an OAuth 2.0 access 
>>>>>> token. 
>>>>>> I'm trying to work out what to do when access tokens expire, as they do 
>>>>>> from time to time.
>>>>>>
>>>>>> I suppose I could let the stream complete with failure, and the class 
>>>>>> that created the stream could detect that the failure was due to token 
>>>>>> expiry, and it could do the OAuth 2.0 refresh token dance to get a new 
>>>>>> access token, and then re-create the stream. It feels wrong, though, 
>>>>>> because what about all the nice little elements in transit at the time 
>>>>>> of 
>>>>>> the failure? They'll just fall on the floor and die. Sad.
>>>>>>
>>>>>> And then I read about supervision strategies, and I'm thinking Yeah! 
>>>>>> that's right, I only need to restart the stage, and all the nice little 
>>>>>> elements can just wait while I do the refresh dance. But then I don't 
>>>>>> like 
>>>>>> the idea of having the decider function - the function that chooses the 
>>>>>> Supervision.Directive, have side effects like OAuth 2.0 refresh dances 
>>>>>> and 
>>>>>> missile launches and stuff. So then I read about Custom Stream 
>>>>>> Processing: 
>>>>>> PushPullStage, and PushStage, and in the Akka Streams code I found a 
>>>>>> thingy 
>>>>>> called AsyncStage - maybe I could write a custom stage that extends that.
>>>>>>
>>>>>> I feel like I'm in a room with eight closed doors, and behind one is 
>>>>>> a land flowing with milk and honey, or nice little elements at least, 
>>>>>> and 
>>>>>> behind all of the others is ritual humiliation. 
>>>>>>
>>>>>> Please help me choose. Where do I put the OAuth 2.0 refresh token 
>>>>>> dance?
>>>>>>
>>>>> -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: 
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: 
>>>>> https://groups.google.com/group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to a topic in the 
>>>>> Google Groups "Akka User List" group.
>>>>> To unsubscribe from this topic, visit 
>>>>> https://groups.google.com/d/topic/akka-user/qh1ktrdbjbE/unsubscribe.
>>>>> To unsubscribe from this group and all its topics, send an email to 
>>>>> akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to