Hey Timo,

To be completely honest, I _think_ they are POJO, although I use case
classes (because I want our data to be immutable).

I wrote a sample code, which basically reflects our pipeline:
https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala

The thing to notice is that we do the split to side outputs _after_ the
window functions -- because we want to split the results just before the
sinks (we had a split there instead, but the job would, sometimes, crash
because "splits can't be used with side outputs", or something around those
lines). Are we correct in assume that there can't be side outputs once a
window is processed?

On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <twal...@apache.org> wrote:

> Hi Julio,
>
> I tried to reproduce your problem locally but everything run correctly.
> Could you share a little example job with us?
>
> This worked for me:
>
> class TestingClass {
>   var hello: Int = 0}
> class TestA extends TestingClass {
>   var test: String = _
> }
> def main(args: Array[String]) {
>
>   // set up the execution environment  val env = 
> StreamExecutionEnvironment.getExecutionEnvironment  // get input data  val 
> text = env.fromElements(WordCountData.WORDS: _*)
>
>   val outputTag = OutputTag[(String, Int)]("side-output")
>   val outputTag2 = OutputTag[TestingClass]("side-output2")
>
>   val counts: DataStream[(String, Int)] = text
>     // split up the lines in pairs (2-tuples) containing: (word,1)    
> .flatMap(_.toLowerCase.split("\\W+"))
>     .filter(_.nonEmpty)
>     .map((_, 1))
>     // group by the tuple field "0" and sum up tuple field "1"    .keyBy(0)
>     .sum(1)
>       .process(new ProcessFunction[(String, Int), (String, Int)] {
>         override def processElement(value: (String, Int), ctx: 
> ProcessFunction[(String, Int), (String, Int)]#Context, out: 
> Collector[(String, Int)]): Unit = {
>           ctx.output(outputTag, value)
>           ctx.output(outputTag2, new TestingClass)
>           ctx.output(outputTag2, new TestA)
>         }
>       })
>
>   counts.getSideOutput(outputTag).print()
>   counts.getSideOutput(outputTag2).print()
>
>   // execute program  env.execute("Streaming WordCount")
> }
>
>
> Are the Metric classes proper POJO types?
>
> Regards,
> Timo
>
>
> Am 02.04.18 um 21:53 schrieb Julio Biason:
>
> Hey guys,
>
> I have a pipeline that generates two different types of data (but both use
> the same trait) and I need to save each on a different sink.
>
> So far, things were working with splits, but it seems using splits with
> side outputs (for the late data, which we'll plug a late arrival handling)
> causes errors, so I changed everything to side outputs.
>
> To select a side output based on type, I did the following:
>
> class MetricTypeSplitter(accountingTag:OutputTag[Metric],
> analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {
>
>   val logger = LoggerFactory.getLogger(this.getClass)
>
>   override def processElement(
>     value:Metric,
>     ctx:ProcessFunction[Metric, Metric]#Context,
>     out:Collector[Metric]
>   ): Unit = {
>     out.collect(value)
>     value match {
>       case record:AccountingMetric => {
>         logger.info(s"Sending ${record} to Accounting")
>         ctx.output(accountingTag, record)
>       }
>       case record:AnalysingMetric => {
>         logger.info(s"Sending ${record} to Analysis")
>         ctx.output(analysingTag, record)
>       }
>       case _ => {
>         logger.error(s"Don't know the type of ${value}")
>       }
>     }
>   }
> }
>
> And at the end of the pipeline I add the splitter:
>
>     pipeline
>       .process(new MetricTypeSplitter(accountTag, analysisTag))
>
> So far, this works and I can see the logs of which tag each metric in
> being sent being generated. The second part, in which I capture the side
> output and send the data to sink, doesn't seem to work, though:
>
>     pipeline
>       .getSideOutput(accountTag)
>       .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
>       .writeUsingOutputFormat(accountingSink.output).name(s"
> ${accountingSink}")
>
>
>
> And here is the problem: It seems .getSideOutput() is never actually
> getting the side output because a the logger in AccoutingSink.toRow() is
> never happening and the data is not showing on our database (toRow()
> convers the Metric to a Row and accountingSInk.output returns the
> JDBCOutputFormat).
>
> Any ideas what I need to do for side outputs to be actually captured?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
> <callto:+5551996209291>*99907 0554*
>
>
>


-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
<callto:+5551996209291>*99907 0554*

Reply via email to