@Raghu Thanks for the explanation. I had already realized after Aljoscha's comment that Kafka enforces this data model. I was too much in the Flink land, where we usually only use the value part (although it its also possible to set the key). Good to hear you agree on factoring out the watermark/timestamp extraction API methods into a separate interface so that it can be reused across sources.
@Frances Sources are not simple DoFns. They add additional functionality, e.g. checkpointing, watermark generation, creating splits. If we want sinks to be portable, we should think about a dedicated interface. At least for the checkpointing. To come back to the original question, I think we reached a consensus that we don't want a 'useNative()' method on Beam sources :) On Fri, Apr 29, 2016 at 5:14 PM, Raghu Angadi <rang...@google.com.invalid> wrote: > On Fri, Apr 29, 2016 at 2:11 AM, Maximilian Michels <m...@apache.org> wrote: > >> Further, the KafkaIO enforces a <key, value> data model which AFAIK is >> not enforced by the Beam model. I don't know the details for this >> design decision but I would like this to be communicated before it is >> merged into the master. >> > > <key, value> structure comes from Kafka. It is not related to Beam or any > runner. I am not sure if I understood the concern here correctly. Every Kafka > record > <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#ConsumerRecord(java.lang.String,%20int,%20long,%20K,%20V)> > has key, value, and some metadata like offset and partition. KafkaIO > directly reflects that. > > As most users don't need Kafka metadata, we added '.withoutMetadata() > <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L130>' > that returns just <key, value>. > > One can argue many kafka consumers don't need key either. We debated adding > another helper method '.dropKey()', but left it out. Users can easily drop > keys in Beam : > > pipeline.apply(KafkaIO.read()....withoutMetaData()) > *.apply(Values.<String>create())* > > >> Otherwise, it will be harder to achieve >> portability among the Runners. In Flink and Spark we are already >> experienced with all kinds of connectors and user's needs. It would be >> nice to feed that back in the course of adding new connectors to the >> Beam API. >>