Hi tambunanw,

The issue is already known and we’ll patch soon. [1]
In next release (maybe 0.9.1), the problem will be solved.

Regards,
Chiwan Park

[1] https://issues.apache.org/jira/browse/FLINK-2257

> On Jul 3, 2015, at 4:57 PM, tambunanw <if05...@gmail.com> wrote:
> 
> Hi All, 
> 
> I'm trying to create some experiment with rich windowing function and
> operator state. I modify the streaming stock prices from 
> 
> https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
> 
> I create the simple windowing function like below
> 
> class MyWindowFunction extends RichWindowMapFunction[StockPricex,
> StockPricex] {
>  println("created")
>  private var counter = 0
> 
>  override def open(conf: Configuration): Unit = {
>    println("opened")
>  }
> 
>  override def mapWindow(values: Iterable[StockPricex], out:
> Collector[StockPricex]): Unit = {
>    // if not initialized ..
> 
>    println(counter)
>    println(values)
>    counter = counter + 1
> 
>  }
> }
> 
> However the open() method is not invoked when i try to run this code on my
> local environment
> 
>    spx
>      .groupBy(x => x.symbol)
>      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
> TimeUnit.SECONDS))
>      .mapWindow(new MyWindowFunction)
> 
> Any thought on this one ?
> 
> 
> Cheers
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.





Reply via email to