Re: HBase connector does not read ZK configuration from Spark session

2018-02-22 Thread Jorge Machado
Can it be that you are missing the HBASE_HOME var ? 

Jorge Machado






> On 23 Feb 2018, at 04:55, Dharmin Siddesh J  wrote:
> 
> I am trying to write a Spark program that reads data from HBase and store it 
> in DataFrame.
> 
> I am able to run it perfectly with hbase-site.xml in the $SPARK_HOME/conf 
> folder, but I am facing few issues here.
> 
> Issue 1
> 
> The first issue is passing hbase-site.xml location with the --files parameter 
> submitted through client mode (it works in cluster mode).
> 
> 
> 
> When I removed hbase-site.xml from $SPARK_HOME/conf and tried to execute it 
> in client mode by passing with the --files parameter over YARN I keep getting 
> the an exception (which I think means it is not taking the ZooKeeper 
> configuration from hbase-site.xml.
> 
> spark-submit \
> 
>   --master yarn \
> 
>   --deploy-mode client \
> 
>   --files /home/siddesh/hbase-site.xml \
> 
>   --class com.orzota.rs.json.HbaseConnector \
> 
>   --packages com.hortonworks:shc:1.0.0-2.0-s_2.11 \
> 
>   --repositories http://repo.hortonworks.com/content/groups/public/ 
>  \
> 
>   target/scala-2.11/test-0.1-SNAPSHOT.jar
> 
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
> 
> 18/02/22 01:43:09 INFO ClientCnxn: Opening socket connection to server 
> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL 
> (unknown error)
> 
> 18/02/22 01:43:09 WARN ClientCnxn: Session 0x0 for server null, unexpected 
> error, closing socket connection and attempting reconnect
> 
> java.net.ConnectException: Connection refused
> 
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 
> at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 
> at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 
> at 
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
> 
> However it works good when I run it in cluster mode.
> 
> 
> 
> Issue 2
> 
> Passing the HBase configuration details through the Spark session, which I 
> can't get to work in both client and cluster mode.
> 
> 
> 
> Instead of passing the entire hbase-site.xml I am trying to add the 
> configuration directly in the code by adding it as a configuration parameter 
> in the SparkSession, e.g.:
> 
> 
> 
> val spark = SparkSession
> 
>   .builder()
> 
>   .appName(name)
> 
>   .config("hbase.zookeeper.property.clientPort", "2181")
> 
>   .config("hbase.zookeeper.quorum", "ip1,ip2,ip3")
> 
>   .config("spark.hbase.host","zookeeperquorum")
> 
>   .getOrCreate()
> 
> 
> 
> val json_df =
> 
>   spark.read.option("catalog",catalog_read).
> 
>   format("org.apache.spark.sql.execution.datasources.hbase").
> 
>   load()
> 
> This is not working in cluster mode either.
> 
> 
> 
> Can anyone help me with a solution or explanation why this is happening are 
> there any workarounds?
> 
> 
> 



Re: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread naresh Goud
Here is my understanding, hope this gives some idea to understand how it
works. It might be wrong also, please excuse if it’s . I am trying to
derivating execution model with my understanding. Sorry it’s long email.

driver will keep polling Kafka for latest offset of each topic and then it
schedule jobs with offsets pulled from topic meta data.

Here job(processing logic1+  Logic2 + logic3).
These logics will be executed sequential only as defined in your
application code in executor.

Whenever job get started it will be started in one transaction which
includes following activities
Transaction
{
Get data from Kafka.
Execute logic1 -> logic2 -> logic3
Update processed record offset information
}

Having said that, coming to your approach mentioned for parallel processing
 If you pass three topics to single create Dstream spark will poll once to
get offsets of all topics instead of three poll if you create with with
different createDStream.

With the above mentioned approach of execution job is scheduled as below.
Job{
   Logic1 with offsets
Logic2 with its topic offsets
   Logic 3 with its offsets
}

With this approach also it executing logics in sequential.

Lets come to your last point of differentiate data by somehow and I am
assuming your application logic as below and  schedules job would look like
this

Job{
If(topic1 record){execute logic1)
If(topic2 record ) {execute logic2}
If(topic3 record) {execute logic3}
}

This is also leads to sequential execution.


distributed system are not designed  to execute parts of  job in parallel,
instead it will execute whole job across partitions of data in parallel.

To summarize it will be possible to parallelism is possible within each
topic processing not across processing different topics. Assume if you have
partition for a topic 3, then there would be 3 executors run parallel
executing job.





On Thu, Feb 22, 2018 at 9:44 PM Vibhakar, Beejal <
beejal.vibha...@fisglobal.com> wrote:

> Naresh – Thanks for taking out time to respond.
>
>
>
> So is it right to say that it’s the Driver program which at every 30
> seconds tells the executors (Which manage the Streams) to run rather than
> each executor making that decision themselves? And this really makes it
> sequential execution in my case?
>
>
>
> BTW, do you think following would be more suitable way to run this in
> parallel?
>
>
>
>- Right now I am creating 3 DataStream, one for each entity using
>KafkaUtils.createDirectStream API
>- While creating each DataStream, I pass on a single Kafka topic
>- Instead of creating 3 DataStream if I create a single DataStream and
>pass on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be
>able to parallelize the processing (We just need to allocate right number
>of executors)
>- To have separate processing logic for each entity, I just need some
>way to differentiate records of one type of entity from other type of
>entities.
>
>
>
> -Beejal
>
>
>
> *From:* naresh Goud [mailto:nareshgoud.du...@gmail.com]
> *Sent:* Friday, February 23, 2018 8:56 AM
> *To:* Vibhakar, Beejal 
> *Subject:* Re: Consuming Data in Parallel using Spark Streaming
>
>
>
> You will have the same behavior both in local and hadoop cluster.
>
> since there will be only one stream context in driver which runs in Single
> JVM).
>
>
>
> On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal <
> beejal.vibha...@fisglobal.com> wrote:
>
> I am trying to process data from 3 different Kafka topics using 3
> InputDStream with a single StreamingContext. I am currently testing this
> under Sandbox where I see data processed from one Kafka topic followed by
> other.
>
>
>
> *Question#1:* I want to understand that when I run this program in Hadoop
> cluster, will it process the data in parallel from 3 Kafka topics OR will I
> see the same behavior as I see in my Sandbox?
>
>
>
> *Question#2:* I aim to process the data from all three Kafka topics in
> parallel.  Can I achieve this without breaking this program into 3 separate
> smaller programs?
>
>
>
> Here’s how the code template looks like..
>
>
>
>*val* ssc = *new* StreamingContext(sc, 30)
>
>
>
> *val topic1 = Array(“TOPIC1”)*
>
>
>
>*val* dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[Array[Byte], GenericRecord](*topic1*, kafkaParms))
>
>
>
>  // Processing logic for dataStreamTopic1
>
>
>
>
>
> *val topic2 = Array(“TOPIC2”)*
>
>
>
>*val* dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[Array[Byte], GenericRecord](*topic2*, kafkaParms))
>
>
>
>  // Processing logic for dataStreamTopic2
>
>
>
>
>
> *val topic3 = Array(“TOPIC3”)*
>
>
>
>*val* dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsiste

Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
Got it. I understood issue in different way.



On Thu, Feb 22, 2018 at 9:19 PM Keith Chapman 
wrote:

> My issue is that there is not enough pressure on GC, hence GC is not
> kicking in fast enough to delete the shuffle files of previous iterations.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud 
> wrote:
>
>> It would be very difficult to tell without knowing what is your
>> application code doing, what kind of transformation/actions performing.
>> From my previous experience tuning application code which avoids
>> unnecessary objects reduce pressure on GC.
>>
>>
>> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm benchmarking a spark application by running it for multiple
>>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>>> machine with a very large hear (~200GB). The system has a SSD. When running
>>> for 3 to 4 iterations I get into a situation that I run out of disk space
>>> on the /tmp directory. On further investigation I was able to figure out
>>> that the reason for this is that the shuffle files are still around,
>>> because I have a very large hear GC has not happen and hence the shuffle
>>> files are not deleted. I was able to confirm this by lowering the heap size
>>> and I see GC kicking in more often and the size of /tmp stays under
>>> control. Is there any way I could configure spark to handle this issue?
>>>
>>> One option that I have is to have GC run more often by
>>> setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
>>> cleaner solution?
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>
>>
>


HBase connector does not read ZK configuration from Spark session

2018-02-22 Thread Dharmin Siddesh J
I am trying to write a Spark program that reads data from HBase and store
it in DataFrame.

I am able to run it perfectly with hbase-site.xml in the $SPARK_HOME/conf
folder, but I am facing few issues here.

Issue 1

The first issue is passing hbase-site.xml location with the --files
parameter submitted through client mode (it works in cluster mode).


When I removed hbase-site.xml from $SPARK_HOME/conf and tried to execute it
in client mode by passing with the --files parameter over YARN I keep
getting the an exception (which I think means it is not taking the
ZooKeeper configuration from hbase-site.xml.

spark-submit \

  --master yarn \

  --deploy-mode client \

  --files /home/siddesh/hbase-site.xml \

  --class com.orzota.rs.json.HbaseConnector \

  --packages com.hortonworks:shc:1.0.0-2.0-s_2.11 \

  --repositories http://repo.hortonworks.com/content/groups/public/ \

  target/scala-2.11/test-0.1-SNAPSHOT.jar

at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

18/02/22 01:43:09 INFO ClientCnxn: Opening socket connection to server
localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL
(unknown error)

18/02/22 01:43:09 WARN ClientCnxn: Session 0x0 for server null, unexpected
error, closing socket connection and attempting reconnect

java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)

at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)

at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

However it works good when I run it in cluster mode.


Issue 2

Passing the HBase configuration details through the Spark session, which I
can't get to work in both client and cluster mode.


Instead of passing the entire hbase-site.xml I am trying to add the
configuration directly in the code by adding it as a configuration
parameter in the SparkSession, e.g.:


val spark = SparkSession

  .builder()

  .appName(name)

  .config("hbase.zookeeper.property.clientPort", "2181")

  .config("hbase.zookeeper.quorum", "ip1,ip2,ip3")

  .config("spark.hbase.host","zookeeperquorum")

  .getOrCreate()


val json_df =

  spark.read.option("catalog",catalog_read).

  format("org.apache.spark.sql.execution.datasources.hbase").

  load()

This is not working in cluster mode either.


Can anyone help me with a solution or explanation why this is happening are
there any workarounds?


RE: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread Vibhakar, Beejal
Naresh – Thanks for taking out time to respond.

So is it right to say that it’s the Driver program which at every 30 seconds 
tells the executors (Which manage the Streams) to run rather than each executor 
making that decision themselves? And this really makes it sequential execution 
in my case?

BTW, do you think following would be more suitable way to run this in parallel?


  *   Right now I am creating 3 DataStream, one for each entity using 
KafkaUtils.createDirectStream API
  *   While creating each DataStream, I pass on a single Kafka topic
  *   Instead of creating 3 DataStream if I create a single DataStream and pass 
on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be able to 
parallelize the processing (We just need to allocate right number of executors)
  *   To have separate processing logic for each entity, I just need some way 
to differentiate records of one type of entity from other type of entities.

-Beejal

From: naresh Goud [mailto:nareshgoud.du...@gmail.com]
Sent: Friday, February 23, 2018 8:56 AM
To: Vibhakar, Beejal 
Subject: Re: Consuming Data in Parallel using Spark Streaming

You will have the same behavior both in local and hadoop cluster.
since there will be only one stream context in driver which runs in Single JVM).

On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal 
mailto:beejal.vibha...@fisglobal.com>> wrote:
I am trying to process data from 3 different Kafka topics using 3 InputDStream 
with a single StreamingContext. I am currently testing this under Sandbox where 
I see data processed from one Kafka topic followed by other.

Question#1: I want to understand that when I run this program in Hadoop 
cluster, will it process the data in parallel from 3 Kafka topics OR will I see 
the same behavior as I see in my Sandbox?

Question#2: I aim to process the data from all three Kafka topics in parallel.  
Can I achieve this without breaking this program into 3 separate smaller 
programs?

Here’s how the code template looks like..

   val ssc = new StreamingContext(sc, 30)

val topic1 = Array(“TOPIC1”)

   val dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic1, kafkaParms))

 // Processing logic for dataStreamTopic1


val topic2 = Array(“TOPIC2”)

   val dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic2, kafkaParms))

 // Processing logic for dataStreamTopic2


val topic3 = Array(“TOPIC3”)

   val dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic3, kafkaParms))

 // Processing logic for dataStreamTopic3

// Start the Streaming
ssc.start()
ssc.awaitTermination()

Here’s how I submit my spark job on my sandbox…

./bin/spark-submit --class  --master local[*] 

Thanks,
Beejal


The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
My issue is that there is not enough pressure on GC, hence GC is not
kicking in fast enough to delete the shuffle files of previous iterations.

Regards,
Keith.

http://keith-chapman.com

On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud 
wrote:

> It would be very difficult to tell without knowing what is your
> application code doing, what kind of transformation/actions performing.
> From my previous experience tuning application code which avoids
> unnecessary objects reduce pressure on GC.
>
>
> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman 
> wrote:
>
>> Hi,
>>
>> I'm benchmarking a spark application by running it for multiple
>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>> machine with a very large hear (~200GB). The system has a SSD. When running
>> for 3 to 4 iterations I get into a situation that I run out of disk space
>> on the /tmp directory. On further investigation I was able to figure out
>> that the reason for this is that the shuffle files are still around,
>> because I have a very large hear GC has not happen and hence the shuffle
>> files are not deleted. I was able to confirm this by lowering the heap size
>> and I see GC kicking in more often and the size of /tmp stays under
>> control. Is there any way I could configure spark to handle this issue?
>>
>> One option that I have is to have GC run more often by
>> setting spark.cleaner.periodicGC.interval to a much lower value. Is
>> there a cleaner solution?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>
>


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
It would be very difficult to tell without knowing what is your application
code doing, what kind of transformation/actions performing. From my
previous experience tuning application code which avoids unnecessary
objects reduce pressure on GC.


On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman 
wrote:

> Hi,
>
> I'm benchmarking a spark application by running it for multiple
> iterations, its a benchmark thats heavy on shuffle and I run it on a local
> machine with a very large hear (~200GB). The system has a SSD. When running
> for 3 to 4 iterations I get into a situation that I run out of disk space
> on the /tmp directory. On further investigation I was able to figure out
> that the reason for this is that the shuffle files are still around,
> because I have a very large hear GC has not happen and hence the shuffle
> files are not deleted. I was able to confirm this by lowering the heap size
> and I see GC kicking in more often and the size of /tmp stays under
> control. Is there any way I could configure spark to handle this issue?
>
> One option that I have is to have GC run more often by
> setting spark.cleaner.periodicGC.interval to a much lower value. Is there
> a cleaner solution?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Re: Return statements aren't allowed in Spark closures

2018-02-22 Thread naresh Goud
Even i am not able to reproduce error

On Thu, Feb 22, 2018 at 2:51 AM, Michael Artz 
wrote:

> I am not able to reproduce your error. You should do something before you
> do that last function and maybe get some more help from the exception it
> returns. Like just add a csv.show (1) on the line before.  Also, can you
> post the different exception when you took out the "return" value like when
> Bryan suggested?
>
> It's getting to this bit of code
>
> private[spark] class ReturnStatementInClosureException
>   extends SparkException("Return statements aren't allowed in Spark closures")
>
> private class ReturnStatementFinder extends ClassVisitor(ASM5) {
>   override def visitMethod(access: Int, name: String, desc: String,
>   sig: String, exceptions: Array[String]): MethodVisitor = {
> if (name.contains("apply")) {
>   new MethodVisitor(ASM5) {
> override def visitTypeInsn(op: Int, tp: String) {
>   if (op == NEW && 
> tp.contains("scala/runtime/NonLocalReturnControl")) {
> throw new ReturnStatementInClosureException
>   }
> }
>   }
> } else {
>   new MethodVisitor(ASM5) {}
> }
>   }
> }
>
> and it must see the NonLocalReturnControl exception. My first guess is
> that the "queryYahoo" function is doing something that is causing an
> exception, but for some reason (Networking thing maybe?) it works ok in
> spark-shell.
>
> On Feb 21, 2018 10:47 PM, "Lian Jiang"  wrote:
>
>> Sorry Bryan. Unfortunately, this is not the root cause.
>>
>> Any other ideas? This is blocking my scenario. Thanks.
>>
>> On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Lian,
>>>
>>> You're writing Scala. Just remove the 'return'. No need for it in Scala.
>>>
>>> Get Outlook for Android 
>>>
>>> --
>>> *From:* Lian Jiang 
>>> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
>>> *To:* user
>>> *Subject:* Return statements aren't allowed in Spark closures
>>>
>>> I can run below code in spark-shell using yarn client mode.
>>>
>>> val csv = spark.read.option("header", "true").csv("my.csv")
>>>
>>> def queryYahoo(row: Row) : Int = { return 10; }
>>>
>>> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
>>> queryYahoo(r) })}
>>>
>>> However, the same code failed when run using spark-submit in yarn client
>>> or cluster mode due to error:
>>>
>>> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
>>> org.apache.spark.util.ReturnStatementInClosureException: Return
>>> statements aren't allowed in Spark closures
>>>
>>> org.apache.spark.util.ReturnStatementInClosureException: Return
>>> statements aren't allowed in Spark closures
>>>
>>> at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTyp
>>> eInsn(ClosureCleaner.scala:371)
>>>
>>> at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:243)
>>>
>>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)
>>>
>>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)
>>>
>>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach
>>> $1.apply(TraversableLike.scala:733)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa
>>> bleLike.scala:732)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:292)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
>>>
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
>>>
>>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>>> DD.scala:925)
>>>
>>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>>> DD.scala:924)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:112)
>>>
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>
>>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>
>>>
>>> Any idea? Thanks.
>>>
>>
>>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread kant kodali
Hi TD,

I pulled your commit that is listed on this ticket
https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
following steps and self joins work after I cherry-pick your commit!
Good Job! I was hoping it will be part of 2.3.0 but looks like it is
targeted for 2.3.1 :(

git clone https://github.com/apache/spark.gitcd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz
-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das  wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I am
> going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>
>> if I change it to the below code it works. However, I don't believe it is
>> the solution I am looking for. I want to be able to do it in raw SQL and
>> moreover, If a user gives a big chained raw spark SQL join query I am not
>> even sure how to make copies of the dataframe to achieve the self-join. Is
>> there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>>
 Hi All,

 I have the following code

 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table as x inner join table as y 
 on x.offset=y.offset")

 resultdf.writeStream.outputMode("update").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()

 and I get the following exception.

 org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
 input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
 x.timestamp, x.partition]; line 1 pos 50;
 'Project [*]
 +- 'Join Inner, ('x.offset = 'y.offset)
:- SubqueryAlias x
:  +- SubqueryAlias table
: +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]
+- SubqueryAlias y
   +- SubqueryAlias table
  +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]

 any idea whats wrong here?

 Thanks!







>>>
>>
>


Re: Can spark handle this scenario?

2018-02-22 Thread Lian Jiang
Hi Vijay,

Should HTTPConnection() (or any other object created per partition) be
serializable so that your code work? If so, the usage seems to be limited.

Sometimes, the error caused by a non-serializable object can be very
misleading (e.g. "Return statements aren't allowed in Spark closures")
instead of "Task not serializable".

The post shared by Anastasios helps but does not completely resolve the
"need serialization" problem. For example, if I need to create per
partition class object that
relies on other objects which may not be serializable, then wrapping the
object creation in an object making it a static function does not help, not
mentioning
the programming model becomes unintuitive.

I have been played this scenario for some time and still frustrated. Thanks






On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp  wrote:

> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API calls
> connection objects can't/shouldn't be serialized and send to executors,
> they
> should rather be created at executors.
>
> the philosophy given below is nicely documented on Spark Streaming, look at
> Design Patterns for using foreachRDD
> https://spark.apache.org/docs/latest/streaming-programming-
> guide.html#output-operations-on-dstreams
>
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
> //assume symbolDs is rdd of symbol and tick dataset/dataframe can be
> converted to RDD
> symbolRdd.foreachPartition(partition => {
>//this code runs at executor
>   //open a connection here -
>   val connectionToYahoo = new HTTPConnection()
>
>   partition.foreach(k => {
>   pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
>   }
> }
>
> with the above code if the dataset has 10 partitions (2000 symbols), only
> 10
> connections will be opened though it makes 2000 API calls.
> you should also be looking at sending and receiving results for large
> number
> of symbols, because of the amount of parallelism that spark provides you
> might run into rate limit on the APIs. if you are bulk sending symbols
> above
> pattern also very much useful
>
> thanks
> Vijay
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: parquet vs orc files

2018-02-22 Thread Jörn Franke
Look at the documentation of the formats. In any case:
* use additionally partitions on the filesystem
* sort the data on filter columns - otherwise you do not benefit form min/max 
and bloom filters



> On 21. Feb 2018, at 22:58, Kane Kim  wrote:
> 
> Thanks, how does min/max index work? Can spark itself configure bloom filters 
> when saving as orc?
> 
>> On Wed, Feb 21, 2018 at 1:40 PM, Jörn Franke  wrote:
>> In the latest version both are equally well supported.
>> 
>> You need to insert the data sorted on filtering columns
>> Then you will benefit from min max indexes and in case of orc additional 
>> from bloom filters, if you configure them.
>> In any case I recommend also partitioning of files (do not confuse with 
>> Spark partitioning ).
>> 
>> What is best for you you have to figure out in a test. This highly depends 
>> on the data and the analysis you want to do.
>> 
>> > On 21. Feb 2018, at 21:54, Kane Kim  wrote:
>> >
>> > Hello,
>> >
>> > Which format is better supported in spark, parquet or orc?
>> > Will spark use internal sorting of parquet/orc files (and how to test 
>> > that)?
>> > Can spark save sorted parquet/orc files?
>> >
>> > Thanks!
> 


Re: parquet vs orc files

2018-02-22 Thread Kurt Fehlhauer
Hi Kane,

It really depends on your use case. I generally use Parquet because it
seems to have better support beyond Spark. However, if you are dealing with
partitioned Hive tables, the current versions of Spark have an issue where
compression will not be applied. This will be fixed in version 2.3.0. See
https://issues.apache.org/jira/browse/SPARK-21786 for more details. If you
are at the file level compression is applied just fine.

I also agree with Stephen Joung's recommendation. I would also watch the
video on Oreilly to get more context around that slide deck. This will give
you an in-depth understanding of how sorting is taken advantage of within
Spark.

Regards,
Kurt


On Wed, Feb 21, 2018 at 1:54 PM, Kane Kim  wrote:

> Hello,
>
> Which format is better supported in spark, parquet or orc?
> Will spark use internal sorting of parquet/orc files (and how to test
> that)?
> Can spark save sorted parquet/orc files?
>
> Thanks!
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey,

Thanks for testing out stream-stream joins and reporting this issue. I am
going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:

> if I change it to the below code it works. However, I don't believe it is
> the solution I am looking for. I want to be able to do it in raw SQL and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>>> x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>> x.timestamp, x.partition]; line 1 pos 50;
>>> 'Project [*]
>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>:- SubqueryAlias x
>>>:  +- SubqueryAlias table
>>>: +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>+- SubqueryAlias y
>>>   +- SubqueryAlias table
>>>  +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>
>>> any idea whats wrong here?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Log analysis with GraphX

2018-02-22 Thread JB Data
A new one created with my basic *datayse.*

@*JB*Δ 


2018-02-21 13:14 GMT+01:00 Philippe de Rochambeau :

> Hi JB,
> which column in the 8 line DS do you regress ?
>
>
> Le 21 févr. 2018 09:47, JB Data  a écrit :
>
> Hi,
>
> Interesting discussion, let me add my *shell* point of view.
> My focus is only Prediction, to avoid pure DS to "crier aux loups", I warn
> how simple my *datayse* of the problem is :
> - No use of the button in the model, only page navigation.
> - User navigation 're-init' when click on a page ever clicked
> - My dataset is only 8 rows large !
> 2018-01-02 12:00:00;OKK;PAG1;1234555
> 2018-01-02 12:01:01;NEX;PAG2;1234555
> 2018-01-02 12:00:02;OKK;PAG1;5556667
> 2018-01-02 12:01:03;NEX;PAG3;5556667
> 2018-01-02 12:01:04;OKK;PAG3;1234555
> 2018-01-02 12:01:04;NEX;PAG1;1234555
> 2018-01-02 12:01:04;NEX;PAG3;1234555
> 2018-01-02 12:01:04;OKK;PAG2;5556667
>
> Anyway... After 250 python lines...
> *Regression with SKlearn*
> Mean squared error: 2.39
> Variance score: -0.54
> *Regression with Keras*
> Results: -4.60 (3.83) MSE
>
> No doubt that increasing *wc -l* will increase *MSE*.
> DL is the nowdays magic wand that everyone wants to shake above data
> But mastering the wand is not for everyone (myself included), use wand
> with parsimony...
>
> I create this group  as a
> prerequisite of -1 feedback  :-)
>
>
> @*JB*Δ 
>
>
> 2018-02-10 16:28 GMT+01:00 Philippe de Rochambeau :
>
> Hi Jörn,
> thank you for replying.
> By « path analysis », I mean « the user’s navigation from page to page on
> the website » and by « clicking trends »  I mean «  which buttons does
> he/she click and in what order ». In other words, I’d like to measure, make
> sense out of, and perhaps, predict user behavior.
>
> Philippe
>
>
> > Le 10 févr. 2018 à 16:03, Jörn Franke  a écrit :
> >
> > What do you mean by path analysis and clicking trends?
> >
> > If you want to use typical graph algorithm such as longest path,
> shortest path (to detect issues with your navigation page) or page rank
> then probably yes. Similarly if you do a/b testing to compare if you sell
> more with different navigation or product proposals.
> >
> > Really depends your analysis. Only if it looks like a graph does not
> mean you need to do graph analysis .
> > Then another critical element is how to visualize the results of your
> graph analysis (does not have to be a graph to visualize, but it could be
> also a table with if/then rules , eg if product placed at top right then
> 50% more people buy it).
> >
> > However if you want to do some other  analysis such as random forests or
> Markov chains then graphx alone will not help you much.
> >
> >> On 10. Feb 2018, at 15:49, Philippe de Rochambeau 
> wrote:
> >>
> >> Hello,
> >>
> >> Let’s say a website log is structured as follows:
> >>
> >> ;;;
> >>
> >> eg.
> >>
> >> 2018-01-02 12:00:00;OKK;PAG1;1234555
> >> 2018-01-02 12:01:01;NEX;PAG1;1234555
> >> 2018-01-02 12:00:02;OKK;PAG1;5556667
> >> 2018-01-02 12:01:03;NEX;PAG1;5556667
> >>
> >> where OKK stands for the OK Button on Page 1, NEX, the Next Button on
> Page 2, …
> >>
> >> Is GraphX the appropriate tool to analyse the website users’ paths and
> clicking trends,
> >>
> >> Many thanks.
> >>
> >> Philippe
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>


Re: Return statements aren't allowed in Spark closures

2018-02-22 Thread Michael Artz
I am not able to reproduce your error. You should do something before you
do that last function and maybe get some more help from the exception it
returns. Like just add a csv.show (1) on the line before.  Also, can you
post the different exception when you took out the "return" value like when
Bryan suggested?

It's getting to this bit of code

private[spark] class ReturnStatementInClosureException
  extends SparkException("Return statements aren't allowed in Spark closures")

private class ReturnStatementFinder extends ClassVisitor(ASM5) {
  override def visitMethod(access: Int, name: String, desc: String,
  sig: String, exceptions: Array[String]): MethodVisitor = {
if (name.contains("apply")) {
  new MethodVisitor(ASM5) {
override def visitTypeInsn(op: Int, tp: String) {
  if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
throw new ReturnStatementInClosureException
  }
}
  }
} else {
  new MethodVisitor(ASM5) {}
}
  }
}

and it must see the NonLocalReturnControl exception. My first guess is that
the "queryYahoo" function is doing something that is causing an exception,
but for some reason (Networking thing maybe?) it works ok in spark-shell.

On Feb 21, 2018 10:47 PM, "Lian Jiang"  wrote:

> Sorry Bryan. Unfortunately, this is not the root cause.
>
> Any other ideas? This is blocking my scenario. Thanks.
>
> On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey 
> wrote:
>
>> Lian,
>>
>> You're writing Scala. Just remove the 'return'. No need for it in Scala.
>>
>> Get Outlook for Android 
>>
>> --
>> *From:* Lian Jiang 
>> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
>> *To:* user
>> *Subject:* Return statements aren't allowed in Spark closures
>>
>> I can run below code in spark-shell using yarn client mode.
>>
>> val csv = spark.read.option("header", "true").csv("my.csv")
>>
>> def queryYahoo(row: Row) : Int = { return 10; }
>>
>> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
>> queryYahoo(r) })}
>>
>> However, the same code failed when run using spark-submit in yarn client
>> or cluster mode due to error:
>>
>> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
>> org.apache.spark.util.ReturnStatementInClosureException: Return
>> statements aren't allowed in Spark closures
>>
>> org.apache.spark.util.ReturnStatementInClosureException: Return
>> statements aren't allowed in Spark closures
>>
>> at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTyp
>> eInsn(ClosureCleaner.scala:371)
>>
>> at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:243)
>>
>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)
>>
>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)
>>
>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach
>> $1.apply(TraversableLike.scala:733)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa
>> bleLike.scala:732)
>>
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:292)
>>
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
>>
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>> DD.scala:925)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>> DD.scala:924)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:151)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:112)
>>
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>
>>
>> Any idea? Thanks.
>>
>
>


Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
Hi,

I'm benchmarking a spark application by running it for multiple iterations,
its a benchmark thats heavy on shuffle and I run it on a local machine with
a very large hear (~200GB). The system has a SSD. When running for 3 to 4
iterations I get into a situation that I run out of disk space on the /tmp
directory. On further investigation I was able to figure out that the
reason for this is that the shuffle files are still around, because I have
a very large hear GC has not happen and hence the shuffle files are not
deleted. I was able to confirm this by lowering the heap size and I see GC
kicking in more often and the size of /tmp stays under control. Is there
any way I could configure spark to handle this issue?

One option that I have is to have GC run more often by
setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
cleaner solution?

Regards,
Keith.

http://keith-chapman.com


Hortonworks Spark-Hbase-Connector does not read zookeeper configurations from spark session config ??(Spark on Yarn)

2018-02-22 Thread Dharmin Siddesh J
Hi

I am trying to write a spark code that reads data from Hbase and store it
in DataFrame.
I am able to run it perfectly with hbase-site.xml in $spark-home/conf
folder.
But I am facing few issues Here.

Issue 1: Passing hbase-site.xml location with --file parameter submitted
through client mode(It is working in cluster mode)

When I  removed hbase-site.xml from spark/conf and try to execute it in the
client mode by passing with file --file parameter over yarn I keep getting
the following exception. Which I think it means it is not taking the
zookeeper configuration from hbase-site.xml. How ever it works good when
i run it in cluster mode.
sample command

spark-submit --master yarn --deploy-mode cluster --files
/home/siddesh/hbase-site.xml --class com.orzota.rs.json.HbaseConnector
 --packages com.hortonworks:shc:1.0.0-2.0-s_2.11 --repositories
http://repo.hortonworks.com/content/groups/public/
target/scala-2.11/test-0.1-SNAPSHOT.jar

at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
18/02/22 01:43:09 INFO ClientCnxn: Opening socket connection to server
localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL
(unknown error)
18/02/22 01:43:09 WARN ClientCnxn: Session 0x0 for server null, unexpected
error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

Issue 2: Passing hbase configuration details through spark session(Not
working in cluster as well as client mode).
Instead of passing the entire hbase-site.xml I am trying to add the
configuration directly in the spark code by adding it as a config parameter
in spark session the following is a sample spark-session command.

val spark = SparkSession
.builder()
.appName(name)

.config("hbase.zookeeper.property.clientPort", "2181")
.config("hbase.zookeeper.quorum",
"ip1,ip2,ip3")

.config("spark.hbase.host","zookeeperquorum")
.getOrCreate()

val json_df =
spark.read.option("catalog",catalog_read).format("org.apache.spark.sql.execution.datasources.hbase").load()

But it is not working in the cluster mode while the issue-1 continues in
the client mode.

Can anyone help me with a solution or explanation why this is happening are
there any work arounds ??.

regards
Sid