Hi tambunanw, The issue is already known and we’ll patch soon. [1] In next release (maybe 0.9.1), the problem will be solved.
Regards, Chiwan Park [1] https://issues.apache.org/jira/browse/FLINK-2257 > On Jul 3, 2015, at 4:57 PM, tambunanw <if05...@gmail.com> wrote: > > Hi All, > > I'm trying to create some experiment with rich windowing function and > operator state. I modify the streaming stock prices from > > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala > > I create the simple windowing function like below > > class MyWindowFunction extends RichWindowMapFunction[StockPricex, > StockPricex] { > println("created") > private var counter = 0 > > override def open(conf: Configuration): Unit = { > println("opened") > } > > override def mapWindow(values: Iterable[StockPricex], out: > Collector[StockPricex]): Unit = { > // if not initialized .. > > println(counter) > println(values) > counter = counter + 1 > > } > } > > However the open() method is not invoked when i try to run this code on my > local environment > > spx > .groupBy(x => x.symbol) > .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, > TimeUnit.SECONDS)) > .mapWindow(new MyWindowFunction) > > Any thought on this one ? > > > Cheers > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.