Hi Julio,

thanks for this great example. I could reproduce it on my machine and I could find the problem.

You need to store the newly created branch of your pipeline in some variable like `val test = pipeline.process()` in order to access the side outputs via `test.getSideOutput(outputSimple)`. Right now your program expects a a side output from the wrong operator (namely the window operation).

Regards,
Timo


Am 04.04.18 um 16:35 schrieb Julio Biason:
Hey Timo,

To be completely honest, I _think_ they are POJO, although I use case classes (because I want our data to be immutable).

I wrote a sample code, which basically reflects our pipeline: https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala

The thing to notice is that we do the split to side outputs _after_ the window functions -- because we want to split the results just before the sinks (we had a split there instead, but the job would, sometimes, crash because "splits can't be used with side outputs", or something around those lines). Are we correct in assume that there can't be side outputs once a window is processed?

On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Julio,

    I tried to reproduce your problem locally but everything run
    correctly. Could you share a little example job with us?

    This worked for me:

    class TestingClass {
       var hello:Int =0 }

    class TestAextends TestingClass {
       var test:String = _
    }

    def main(args: Array[String]) {

       // set up the execution environment val env = 
StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = 
env.fromElements(WordCountData.WORDS: _*)

       val outputTag =OutputTag[(String, Int)]("side-output")
       val outputTag2 =OutputTag[TestingClass]("side-output2")

       val counts: DataStream[(String, Int)] = text
         // split up the lines in pairs (2-tuples) containing: (word,1) 
.flatMap(_.toLowerCase.split("\\W+"))
         .filter(_.nonEmpty)
         .map((_, 1))
         // group by the tuple field "0" and sum up tuple field "1" .keyBy(0)
         .sum(1)
           .process(new ProcessFunction[(String, Int), (String, Int)] {
             override def processElement(value: (String, Int), ctx: 
ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, 
Int)]):Unit = {
               ctx.output(outputTag, value)
               ctx.output(outputTag2, new TestingClass)
               ctx.output(outputTag2, new TestA)
             }
           })

       counts.getSideOutput(outputTag).print()
       counts.getSideOutput(outputTag2).print()

       // execute program env.execute("Streaming WordCount")
    }


    Are the Metric classes proper POJO types?

    Regards,
    Timo


    Am 02.04.18 um 21:53 schrieb Julio Biason:
    Hey guys,

    I have a pipeline that generates two different types of data (but
    both use the same trait) and I need to save each on a different sink.

    So far, things were working with splits, but it seems using
    splits with side outputs (for the late data, which we'll plug a
    late arrival handling) causes errors, so I changed everything to
    side outputs.

    To select a side output based on type, I did the following:

    class MetricTypeSplitter(accountingTag:OutputTag[Metric],
    analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric,
    Metric] {

      val logger = LoggerFactory.getLogger(this.getClass)

      override def processElement(
        value:Metric,
        ctx:ProcessFunction[Metric, Metric]#Context,
        out:Collector[Metric]
      ): Unit = {
        out.collect(value)
        value match {
          case record:AccountingMetric => {
    logger.info <http://logger.info>(s"Sending ${record} to Accounting")
            ctx.output(accountingTag, record)
          }
          case record:AnalysingMetric => {
    logger.info <http://logger.info>(s"Sending ${record} to Analysis")
            ctx.output(analysingTag, record)
          }
          case _ => {
            logger.error(s"Don't know the type of ${value}")
          }
        }
      }
    }

    And at the end of the pipeline I add the splitter:

        pipeline
          .process(new MetricTypeSplitter(accountTag, analysisTag))

    So far, this works and I can see the logs of which tag each
    metric in being sent being generated. The second part, in which I
    capture the side output and send the data to sink, doesn't seem
    to work, though:

        pipeline
          .getSideOutput(accountTag)
          .map { tuple => AccountingSink.toRow(tuple)
    }.name("Accounting rows")
    .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")

    And here is the problem: It seems .getSideOutput() is never
    actually getting the side output because a the logger in
    AccoutingSink.toRow() is never happening and the data is not
    showing on our database (toRow() convers the Metric to a Row and
    accountingSInk.output returns the JDBCOutputFormat).

    Any ideas what I need to do for side outputs to be actually captured?

-- *Julio Biason*,Sofware Engineer
    *AZION*  | Deliver. Accelerate. Protect.
    Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55
    51 <callto:+5551996209291>_99907 0554_





--
*Julio Biason*,Sofware Engineer
*AZION*  | Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51 <callto:+5551996209291>_99907 0554_


Reply via email to