Hello Flink People :-)

I am trying to get my head around flink - is it a supported use case to 
register multiple streams with possibly more than one transformation / action 
per stream?


def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val prop = new Properties()
  prop.setProperty("bootstrap.servers", "vmi:9092")
  
  // first stream
  val ins = env.addSource(new FlinkKafkaConsumer010("foo", new 
SimpleStringSchema(), prop))
    .map(s => "transformation-1: " + s)

  ins.map(s => "transformation-2:" + s).print() // one action
  ins.map(s => "transformation-3:" + s).print() // one more action
  ins.map(s => "transformation-4:" + s).print() // another action on the same 
stream

  // second, different stream
  val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new 
SimpleStringSchema(), prop))
    .map(s => "transformation-5: " + s)

  ins2.map(s => "transformation-7:" + s).print() // action
  ins2.map(s => "transformation-6:" + s).print() // different action
  
  env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism 
= 4 ?


Best regards
Peter

Reply via email to