[ https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532593#comment-14532593 ]
ASF GitHub Bot commented on FLINK-1977: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29850458 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java --- @@ -178,38 +182,45 @@ private void initializeConnection() { consumerIterator = stream.iterator(); } - /** - * Called to forward the data from the source to the {@link DataStream}. - * - * @param collector - * The Collector for sending data to the dataStream - */ @Override - public void run(Collector<OUT> collector) throws Exception { - isRunning = true; - try { - while (isRunning && consumerIterator.hasNext()) { - OUT out = schema.deserialize(consumerIterator.next().message()); - if (schema.isEndOfStream(out)) { - break; - } - collector.collect(out); + public void open(Configuration config) throws Exception { + initializeConnection(); + } + + @Override + public boolean reachedEnd() throws Exception { + if (nextElement != null) { + return true; + } else if (consumerIterator.hasNext()) { + OUT out = schema.deserialize(consumerIterator.next().message()); + if (schema.isEndOfStream(out)) { + return false; } - } finally { - consumer.shutdown(); + nextElement = out; } + return false; } @Override - public void open(Configuration config) throws Exception { - initializeConnection(); + public OUT next() throws Exception { + if (nextElement != null) { + OUT out = nextElement; + nextElement = null; + return out; + } + + MessageWithMetadata msg; + OUT out = schema.deserialize(consumerIterator.next().message()); + + if (schema.isEndOfStream(out)) { --- End diff -- Why are you throwing an exception here? The old code stopped reading on that event, without failing > Rework Stream Operators to always be push based > ----------------------------------------------- > > Key: FLINK-1977 > URL: https://issues.apache.org/jira/browse/FLINK-1977 > Project: Flink > Issue Type: Improvement > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > This is a result of the discussion on the mailing list. This is an excerpt > from the mailing list that gives the basic idea of the change: > I propose to change all streaming operators to be push based, with a > slightly improved interface: In addition to collect(), which I would > call receiveElement() I would add receivePunctuation() and > receiveBarrier(). The first operator in the chain would also get data > from the outside invokable that reads from the input iterator and > calls receiveElement() for the first operator in a chain. -- This message was sent by Atlassian JIRA (v6.3.4#6332)