Hi Gabor,

The code is very simple Kafka consumption of data.
I guess, it may be the cluster.
Can you please point out the possible problem toook for in the cluster?

Regards
Amit

On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> + Adding back user list.
>
> I've had a look at the Spark code and it's not modifying 
> "partition.assignment.strategy"
> so the problem
> must be either in your application or in your cluster setup.
>
> G
>
>
> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> It's super interesting because that field has default value:
>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>
>> On Mon, 7 Dec 2020, 10:51 Amit Joshi, <mailtojoshia...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thnks for the reply.
>>> I did tried removing the client version.
>>> But got the same exception.
>>>
>>>
>>> Thnks
>>>
>>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>> library:
>>>>
>>>> <kafka.version>2.4.1</kafka.version>
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>> gschiavonsp...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I think the issue is that you are overriding the kafka-clients that
>>>>> comes with  <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>
>>>>>
>>>>> I'd try removing the kafka-clients and see if it works
>>>>>
>>>>>
>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi <mailtojoshia...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am running the Spark Structured Streaming along with Kafka.
>>>>>> Below is the pom.xml
>>>>>>
>>>>>> <properties>
>>>>>>     <maven.compiler.source>1.8</maven.compiler.source>
>>>>>>     <maven.compiler.target>1.8</maven.compiler.target>
>>>>>>     <encoding>UTF-8</encoding>
>>>>>>     <!-- Put the Scala version of the cluster -->
>>>>>>     <scalaVersion>2.12.10</scalaVersion>
>>>>>>     <sparkVersion>3.0.1</sparkVersion>
>>>>>> </properties>
>>>>>>
>>>>>> <dependency>
>>>>>>     <groupId>org.apache.kafka</groupId>
>>>>>>     <artifactId>kafka-clients</artifactId>
>>>>>>     <version>2.1.0</version>
>>>>>> </dependency>
>>>>>>
>>>>>> <dependency>
>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>     <artifactId>spark-core_2.12</artifactId>
>>>>>>     <version>${sparkVersion}</version>
>>>>>>     <scope>provided</scope>
>>>>>> </dependency>
>>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
>>>>>> <dependency>
>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>     <artifactId>spark-sql_2.12</artifactId>
>>>>>>     <version>${sparkVersion}</version>
>>>>>>     <scope>provided</scope>
>>>>>> </dependency>
>>>>>> <!-- 
>>>>>> https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 
>>>>>> -->
>>>>>> <dependency>
>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>     <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>>     <version>${sparkVersion}</version>
>>>>>> </dependency>
>>>>>>
>>>>>> Building the fat jar with shade plugin. The jar is running as expected 
>>>>>> in my local setup with the command
>>>>>>
>>>>>> *spark-submit --master local[*] --class com.stream.Main --num-executors 
>>>>>> 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>>>>>> prism-event-synch-rta.jar*
>>>>>>
>>>>>> But when I am trying to run same jar in spark cluster using yarn with 
>>>>>> command:
>>>>>>
>>>>>> *spark-submit --master yarn --deploy-mode cluster --class 
>>>>>> com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 
>>>>>> --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*
>>>>>>
>>>>>> Getting the this exception:
>>>>>>
>>>>>>  
>>>>>>
>>>>>>
>>>>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>>>>>> <http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>>>>>>         at 
>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>>>>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>>>>>> configuration "partition.assignment.strategy" which has no default 
>>>>>> value. at 
>>>>>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>>>>
>>>>>> I have tried setting up the "partition.assignment.strategy", then also 
>>>>>> its not working.
>>>>>>
>>>>>> Please help.
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> Amit Joshi
>>>>>>
>>>>>>

Reply via email to