I'm trying to build a sample application using Flink that does the following: 1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue 2. For each symbol performs a real-time lookup of current prices and streams the values
The program compiles fine but I get the following run-time error message: "The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields". I suspect the problem is due to the way I'm accessing the StreamExecutionEnvironment. Could someone please provide pointers to how I can use values from a data stream to create a new streaming data source? Any response is appreciated. Relevant code snippet is provided below: public class RetrieveStockPrices { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "stocks"); DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); streamOfStockSymbols.map(new MapFunction<String, String> () { @Override public String map(String stockSymbol) throws Exception { DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); stockPrices.print(); return null; } }); streamExecEnv.execute("Retrieve Stock Prices"); } } public class LookupStockPrice extends RichSourceFunction<String> { public String stockSymbol = null; public boolean isRunning = true; public LookupStockPrice(String inSymbol) { stockSymbol = inSymbol; } @Override public void open(Configuration parameters) throws Exception { isRunning = true; } @Override public void cancel() { isRunning = false; } @Override public void run(SourceFunction.SourceContext<String> ctx) throws Exception { while (isRunning) { //TODO: query Google Finance API ctx.collect("12.5"); } } } -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-use-values-from-a-data-stream-to-create-a-new-streaming-data-source-tp10680.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.