Hi Gabor, The code is very simple Kafka consumption of data. I guess, it may be the cluster. Can you please point out the possible problem toook for in the cluster?
Regards Amit On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > + Adding back user list. > > I've had a look at the Spark code and it's not modifying > "partition.assignment.strategy" > so the problem > must be either in your application or in your cluster setup. > > G > > > On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> It's super interesting because that field has default value: >> *org.apache.kafka.clients.consumer.RangeAssignor* >> >> On Mon, 7 Dec 2020, 10:51 Amit Joshi, <mailtojoshia...@gmail.com> wrote: >> >>> Hi, >>> >>> Thnks for the reply. >>> I did tried removing the client version. >>> But got the same exception. >>> >>> >>> Thnks >>> >>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> +1 on the mentioned change, Spark uses the following kafka-clients >>>> library: >>>> >>>> <kafka.version>2.4.1</kafka.version> >>>> >>>> G >>>> >>>> >>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon < >>>> gschiavonsp...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I think the issue is that you are overriding the kafka-clients that >>>>> comes with <artifactId>spark-sql-kafka-0-10_2.12</artifactId> >>>>> >>>>> >>>>> I'd try removing the kafka-clients and see if it works >>>>> >>>>> >>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi <mailtojoshia...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I am running the Spark Structured Streaming along with Kafka. >>>>>> Below is the pom.xml >>>>>> >>>>>> <properties> >>>>>> <maven.compiler.source>1.8</maven.compiler.source> >>>>>> <maven.compiler.target>1.8</maven.compiler.target> >>>>>> <encoding>UTF-8</encoding> >>>>>> <!-- Put the Scala version of the cluster --> >>>>>> <scalaVersion>2.12.10</scalaVersion> >>>>>> <sparkVersion>3.0.1</sparkVersion> >>>>>> </properties> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.apache.kafka</groupId> >>>>>> <artifactId>kafka-clients</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-core_2.12</artifactId> >>>>>> <version>${sparkVersion}</version> >>>>>> <scope>provided</scope> >>>>>> </dependency> >>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-sql_2.12</artifactId> >>>>>> <version>${sparkVersion}</version> >>>>>> <scope>provided</scope> >>>>>> </dependency> >>>>>> <!-- >>>>>> https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 >>>>>> --> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> >>>>>> <version>${sparkVersion}</version> >>>>>> </dependency> >>>>>> >>>>>> Building the fat jar with shade plugin. The jar is running as expected >>>>>> in my local setup with the command >>>>>> >>>>>> *spark-submit --master local[*] --class com.stream.Main --num-executors >>>>>> 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g >>>>>> prism-event-synch-rta.jar* >>>>>> >>>>>> But when I am trying to run same jar in spark cluster using yarn with >>>>>> command: >>>>>> >>>>>> *spark-submit --master yarn --deploy-mode cluster --class >>>>>> com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 >>>>>> --executor-memory 4g gs://jars/prism-event-synch-rta.jar* >>>>>> >>>>>> Getting the this exception: >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org >>>>>> <http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355) >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused >>>>>> by: org.apache.kafka.common.config.ConfigException: Missing required >>>>>> configuration "partition.assignment.strategy" which has no default >>>>>> value. at >>>>>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)* >>>>>> >>>>>> I have tried setting up the "partition.assignment.strategy", then also >>>>>> its not working. >>>>>> >>>>>> Please help. >>>>>> >>>>>> >>>>>> Regards >>>>>> >>>>>> Amit Joshi >>>>>> >>>>>>