I'm trying to build a sample application using Flink that does the following:
1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue
2. For each symbol performs a real-time lookup of current prices and streams
the values

The program compiles fine but I get the following run-time error message:
"The implementation of the MapFunction is not serializable. The object
probably contains or references non serializable fields".

I suspect the problem is due to the way I'm accessing the
StreamExecutionEnvironment. Could someone please provide pointers to how I
can use values from a data stream to create a new streaming data source? Any
response is appreciated.

Relevant code snippet is provided below:

public class RetrieveStockPrices {

        @SuppressWarnings("serial")
        public static void main(String[] args) throws Exception {
                final StreamExecutionEnvironment streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("zookeeper.connect", "localhost:2181");
                properties.setProperty("group.id", "stocks");

                DataStream<String> streamOfStockSymbols = 
streamExecEnv.addSource(new
FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(),
properties));
                streamOfStockSymbols.map(new MapFunction<String, String> () {
                        @Override
                        public String map(String stockSymbol) throws Exception {
                                DataStream<String> stockPrices = 
streamExecEnv.addSource(new
LookupStockPrice(stockSymbol));
                                stockPrices.print();
                                return null;
                        }
                });
                
                streamExecEnv.execute("Retrieve Stock Prices");
        }

}


public class LookupStockPrice extends RichSourceFunction<String> {
        public String stockSymbol = null;
        public boolean isRunning = true;

        public LookupStockPrice(String inSymbol) {
                stockSymbol = inSymbol;
        }
        
        @Override
        public void open(Configuration parameters) throws Exception {
                isRunning = true;
        }


        @Override
        public void cancel() {
                isRunning = false;
        }

        @Override
        public void run(SourceFunction.SourceContext<String> ctx)
                        throws Exception {
                while (isRunning) {
                    //TODO: query Google Finance API
                    ctx.collect("12.5");
                }
        }
}




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-use-values-from-a-data-stream-to-create-a-new-streaming-data-source-tp10680.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to