Hi,

void output(DataStream<List<SuperclassEvent>> inputStream) {
These seems odd. Are your events intended to be a list? If not, this should be 
a `DataStream<SuperclassEvent>`.

From the code snippet you’ve attached in the first post, it seems like you’ve 
initialized your source incorrectly.

`env.fromElements(List<...>)` will take the whole list as a single event, thus 
your source is only emitting a single list as a record. Perhaps what you 
actually want to do here is `env.fromCollection(List<...>)`?

This should also eliminate the situation that “only after the whole List is 
processed can the records then be sent to their respective sinks”, as you 
mentioned in your reply.

but this doesn't seem very ideal to us because it requires a new operator to 
first split the stream
IMO, this wouldn’t really introduce noticeable overhead, as the operator will 
be chained to the map operator. Side outputs is also the preferred way here, as 
side outputs subsume stream splitting.



Overall, I think it is reasonable to do a map -> split -> Kafka / Cassandra 
sinks in your case, given that you’ve declared the source correctly to be a 
single SuperclassEvent as a record.

The operator overhead is fairly trivial if it is chained. Another reason to use 
sinks properly is that only then will you benefit from the exactly-once / 
at-least-once delivery guarantees to external systems (which requires 
collaboration between the sink and Flink’s checkpointing).

Hope this helps!

Cheers,
Gordon


On 17 July 2017 at 2:59:38 AM, earellano [via Apache Flink User Mailing List 
archive.] (ml+s2336050n14294...@n4.nabble.com) wrote:

Tzu-Li (Gordon) Tai wrote
It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream<String> someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);
The reason we want to use processElement() & a map function, instead of 
someStream.addSink() is that our output logic has conditional depending on the 
type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream<byte []>
  parsed, producing a List of successful events and/or error events: 
DataStream<List<Events>>
  outputted conditionally, going to Kafka or Cassandra depending on which type 
of event it is.


This is our code for output logic (although modified types to not use our IP):

void output(DataStream<List<SuperclassEvent>> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just don't 
know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to 
iterate through our List<SuperclassElement> and conditionally send each 
individual record to its appropriate sink depending on its type.

I know Flink offers SplitStreams for a similar purpose, but this doesn't seem 
very ideal to us because it requires a new operator to first split the stream, 
and only after the whole List is processed can the records then be sent to 
their respective sinks. Whereas the code above sends the records to their sinks 
immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on 
individual records instead of the whole DataStream? Or are we misusing Flink? 
How do you recommend doing this the best way possible?


--

Also, if processElement() and invoke() aren't meant to be used, should they be 
made package private? Happy to make a pull request if so, although fear that 
might break a few things.

If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html
This email was sent by earellano (via Nabble)
To receive all replies by email, subscribe to this discussion

Reply via email to