Hi Flink users, Can I ask is what would be the better way to read multiple stream sources?
I have a FooSource which implements SourceFunction and reads one source, and would like to read several FooSource. FooSource basically reads data as stream by http call. Option1: Use a for-loop to read several data streams and union them. It looks like: List<DataStream<JSONObject>> streams = new ArrayList<>(); Iterator<String> sourceIter = sourceList.iterator(); while (sourceIter.hasNext()){ String source = sourceIter.next(); streams.add(env.addSource(new FooSource<>(source, new JSONSchema(), properties)).rebalance()); } Iterator<DataStream<JSONObject>> streamsIt = streams.iterator(); DataStream<JSONObject> currentStream = streamsIt.next(); while(streamsIt.hasNext()){ DataStream<JSONObject> nextStream = streamsIt.next(); currentStream = currentStream.union(nextStream); } Option2: Implement SourceFunction and reads many FooSource. The implementation in FooSources looks like: @Override public void open(Configuration parameters) throws Exception { fooSourceList = new ArrayList<>(); LOG.info("Opened"); for(String sourceName: sourceNames) { fooSource FooSource = new FooSource(properties, sourceName); fooSource.open(parameters); fooSourceList.add(fooSource); LOG.info("Read source: " + sourceName); } } @Override public void run(final SourceContext<String> ctx) throws Exception { LOG.info("Processing"); // It won't work, however, a parallel for-loop is fine for performance concern? for(FooSource fooSource: fooSourceList) { fooSource.run(ctx); } } Best, Sendoh -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-read-several-stream-sources-tp11224.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.