Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <srini....@gmail.com> wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>

Reply via email to