Re: multiple streams with multiple actions - proper way?

2017-08-01 Thread Fabian Hueske
Hi Peter,

this kind of use case is supported, but it is best practice to split
independent pipelines into individual jobs.
One reason for that is to isolate failures and restarts.
For example, I would split the program you posted into two programs, one
for the "foo" topic and one of the "bar" topic. Depending on the complexity
of the operations, you might also want to split it further.

Best, Fabian




2017-07-29 18:51 GMT+02:00 Peter Ertl :

> Hello Flink People :-)
>
>
> I am trying to get my head around flink - is it a supported use case to
> register multiple streams with possibly more than one transformation /
> action per stream?
>
>
> def main(args: Array[String]): Unit = {
>
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   val prop = new Properties()
>   prop.setProperty("bootstrap.servers", "vmi:9092")
>
>   // first stream
>   val ins = env.addSource(new FlinkKafkaConsumer010("foo", new 
> SimpleStringSchema(), prop))
> .map(s => "transformation-1: " + s)
>
>   ins.map(s => "transformation-2:" + s).print() // one action
>   ins.map(s => "transformation-3:" + s).print() // one more action
>   ins.map(s => "transformation-4:" + s).print() // another action on the same 
> stream
>
>   // second, different stream
>   val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new 
> SimpleStringSchema(), prop))
> .map(s => "transformation-5: " + s)
>
>   ins2.map(s => "transformation-7:" + s).print() // action
>   ins2.map(s => "transformation-6:" + s).print() // different action
>
>   env.execute("run all streams with multiple actions attached")
> }
>
>
>
> Is this program abusing flnk or is this just how you are supposed to do
> things?
>
> also, how many threads will this programm consume when running with
> parallelism = 4 ?
>
>
> Best regards
> Peter
>
>


multiple streams with multiple actions - proper way?

2017-07-29 Thread Peter Ertl
Hello Flink People :-)


I am trying to get my head around flink - is it a supported use case to 
register multiple streams with possibly more than one transformation / action 
per stream?


def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val prop = new Properties()
  prop.setProperty("bootstrap.servers", "vmi:9092")
  
  // first stream
  val ins = env.addSource(new FlinkKafkaConsumer010("foo", new 
SimpleStringSchema(), prop))
.map(s => "transformation-1: " + s)

  ins.map(s => "transformation-2:" + s).print() // one action
  ins.map(s => "transformation-3:" + s).print() // one more action
  ins.map(s => "transformation-4:" + s).print() // another action on the same 
stream

  // second, different stream
  val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new 
SimpleStringSchema(), prop))
.map(s => "transformation-5: " + s)

  ins2.map(s => "transformation-7:" + s).print() // action
  ins2.map(s => "transformation-6:" + s).print() // different action
  
  env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism 
= 4 ?


Best regards
Peter