Hi Dan,

In case you also want to keep automatic UID assignment, we do something like 
this (scala):

override def run(args: ApplicationArguments): Unit = {
  require(jobName != null, "a specific jobName needs to be configured, if 
hosted in Spring Boot, configure 'flink.job.name' in application.yaml !")

  val graph = env.getStreamGraph(jobName, false)
  val nodes = graph.getStreamNodes
  var missingUid = false
  for(node: StreamNode <- nodes.asScala){
    if(node.getTransformationUID == null){
      missingUid = true
      val message = s"Operator[${node.getId}: ${node.getOperatorName}]: Missing 
uid(...) for state migration]"
      println(message)
      if(forceOperatorUid){
        if(logger.isErrorEnabled())logger.error(message)
      }
      else{
        if(logger.isWarnEnabled())logger.warn(message)
      }
    }
  }

  val exPlan = env.getExecutionPlan
  if(missingUid){
    val message = s"job execution plan: \n$exPlan"
    if(forceOperatorUid){
      if(logger.isErrorEnabled())logger.error(message)
    }
    else{
      if(logger.isWarnEnabled())logger.warn(message)
    }
  }
  else {
    if (logger.isInfoEnabled()) logger.info(s"job execution plan: \n$exPlan")
  }

  println
  println
  println("job execution plan:")
  println
  println(exPlan)
  println
  println

  if(forceOperatorUid){
    require(!missingUid, s"Job not executed because of configuration parameter: 
flink.job.forceOperatorUid: $forceOperatorUid (for state migration)")
  }

  env.execute(jobName)
}


That also gives us a little more explicit diagnostics.

Hope this helps 😊

Thias



From: Dan Hill <quietgol...@gmail.com>
Sent: Montag, 6. Dezember 2021 05:03
To: Chesnay Schepler <ches...@apache.org>
Cc: user <user@flink.apache.org>
Subject: Re: Any way to require .uid(...) calls?

Thanks!

On Sun, Dec 5, 2021 at 1:23 PM Chesnay Schepler 
<ches...@apache.org<mailto:ches...@apache.org>> wrote:
I'm not sure if there is a configuration option for doing so, but the 
generation of UIDs can be disabled via 
ExecutionConfig#disableAutoGeneratedUIDs, which would fail a job if not all 
operators have a UID.

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableAutoGeneratedUIDs();

On 05/12/2021 21:43, Dan Hill wrote:
Hi.

I want to make sure we keep consistent uids on my Flink operators.  Is there a 
way to require uids?  It's pretty easy to add operators and not have explicit 
uids on them.

Also, I noticed an issue (Flink v1.12.3) where a filter operator does not chain 
when it's between a ProcessFunction and a cogroup window operator.  I can't get 
a uid set on this map.  I've tried a few variations and haven't been able to 
chain it.



[cid:image002.png@01D7EA7C.94892860]


Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to