Re: Open method is not called with custom implementation RichWindowMapFunction
I found that the patch had been merged to upstream. [1] :) Regards, Chiwan Park [1] https://github.com/apache/flink/pull/855 On Jul 3, 2015, at 5:26 PM, Welly Tambunan if05...@gmail.com wrote: Thanks Chiwan, Glad to hear that. Cheers On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park chiwanp...@apache.org wrote: Hi tambunanw, The issue is already known and we’ll patch soon. [1] In next release (maybe 0.9.1), the problem will be solved. Regards, Chiwan Park [1] https://issues.apache.org/jira/browse/FLINK-2257 On Jul 3, 2015, at 4:57 PM, tambunanw if05...@gmail.com wrote: Hi All, I'm trying to create some experiment with rich windowing function and operator state. I modify the streaming stock prices from https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala I create the simple windowing function like below class MyWindowFunction extends RichWindowMapFunction[StockPricex, StockPricex] { println(created) private var counter = 0 override def open(conf: Configuration): Unit = { println(opened) } override def mapWindow(values: Iterable[StockPricex], out: Collector[StockPricex]): Unit = { // if not initialized .. println(counter) println(values) counter = counter + 1 } } However the open() method is not invoked when i try to run this code on my local environment spx .groupBy(x = x.symbol) .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS)) .mapWindow(new MyWindowFunction) Any thought on this one ? Cheers -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com
Re: Open method is not called with custom implementation RichWindowMapFunction
Thanks Chiwan, Glad to hear that. Cheers On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park chiwanp...@apache.org wrote: Hi tambunanw, The issue is already known and we’ll patch soon. [1] In next release (maybe 0.9.1), the problem will be solved. Regards, Chiwan Park [1] https://issues.apache.org/jira/browse/FLINK-2257 On Jul 3, 2015, at 4:57 PM, tambunanw if05...@gmail.com wrote: Hi All, I'm trying to create some experiment with rich windowing function and operator state. I modify the streaming stock prices from https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala I create the simple windowing function like below class MyWindowFunction extends RichWindowMapFunction[StockPricex, StockPricex] { println(created) private var counter = 0 override def open(conf: Configuration): Unit = { println(opened) } override def mapWindow(values: Iterable[StockPricex], out: Collector[StockPricex]): Unit = { // if not initialized .. println(counter) println(values) counter = counter + 1 } } However the open() method is not invoked when i try to run this code on my local environment spx .groupBy(x = x.symbol) .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS)) .mapWindow(new MyWindowFunction) Any thought on this one ? Cheers -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. -- Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com http://www.triplelands.com/blog/