Hi,

The large international bank I work for has a custom Kafka implementation.
The client libraries that are used to connect to Kafka have extra security
steps.  They implement the Kafka Consumer and Producer interfaces in this
client library so once we use it to connect to Kafka, we can treat our
connections as the standard Kafka interfaces in our code.

We can't use the out-of-the-box Kafka connecter from Structured Streaming
as only a KafkaConsumer can be used.

Would it be possible / advisable / a good idea to change this to use the
Consumer interface and allow users to specify a callback somehow to create
their own connection to Kafka?

So the signature of this private method in InternalKafkaConsumer would
change to use the Consumer interface (as would the rest of the code base)
and somehow users are given the option to create their own Consumer if they
wanted.  The same would apply for Producers.

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
  ...
  val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParamsWithSecurity)
  *...*
}

At the moment we are left with two options, copy the Spark code base and
swap in our custom Consumer for the KafkaConsumer used in that function
(and a few other changes).  This leaves us with a codebase to maintain that
will be out of sync over time.  Or we can build and maintain our own custom
connecter.

Bet regards,
Patrick

Reply via email to