Spark 3 connect to Hive 1.2

2020-07-22 Thread Ashika Umanga
Greetings,

Our standalone Spark 3 cluster is trying to connect to Hadoop 2.6 cluster
running Hive server 1.2
(/usr/hdp/2.6.2.0-205/hive/lib/hive-service-1.2.1000.2.6.2.0-205.jar)

import org.apache.spark.sql.functions._
import java.sql.Timestamp

val df1 = spark.createDataFrame(
  Seq(
("id1", "v2", "notshared", Timestamp.valueOf("2019-09-13
10:00:00"), false, 1, "2019-09-13"),
("id2", "v3", "notshared", Timestamp.valueOf("2019-09-13
09:00:00"), false, 2, "2019-09-13"),
("id2", "v4", "notshared", Timestamp.valueOf("2019-09-14
11:00:00"), false, 3, "2019-09-14"),
("id2", "v5", "notshared", Timestamp.valueOf("2019-09-14
13:00:00"), false, 4, "2019-09-14"),
("id3", "v4", "notshared", Timestamp.valueOf("2019-09-14
17:00:00"), false, 5, "2019-09-14"),
("id4", "v1", "notshared", Timestamp.valueOf("2019-09-15
19:00:00"), false, 6, "2019-09-15"))).toDF("user_id", "col2", "pidd",
"land_ts", "deleted","offset", "partition")

df1.write.mode("overwrite").saveAsTable("db.spark3_test")

when running above code, is throws the error :

org.apache.spark.sql.AnalysisException:
> org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
> spark_27686. Invalid method name: 'get_table_req';
>   at
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
>   at



I assume this is caused because Spark 3 ships "hive-metastore-2.3.7.jar".
To work with Hive Server 1.2 can I use "hive-metastore-1.2.1.spark2.jar"
from Spark 2.4 distribution ? Do I need any other dependencies ?


How to optimize the configuration and/or code to solve the cache overloading issue?

2020-07-22 Thread Yong Yuan
I met a trouble in using spark structured streaming. The usercache is
continuously consumed due to the join operation without releasing. How can
I optimize the configuration and/or code to solve this problem?


Spark Cluster in AWS EMR.

1 master node, m4.xlarge, 4 core, 16GB
2 core nodes, m4.xlarge, 4 core, 16GB

yarn configuration:
'yarn.nodemanager.disk-health-checker.enable':'true',

 
'yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage':'95.0',
'yarn.nodemanager.localizer.cache.cleanup.interval-ms': '10',
'yarn.nodemanager.localizer.cache.target-size-mb': '1024',
'yarn.nodemanager.pmem-check-enabled': 'false',
'yarn.nodemanager.vmem-check-enabled': 'false',
   'yarn.log-aggregation.retain-seconds': '12000'

spark-submit
--deploy-mode cluster
--num-executors 3 --executor-memory 8G --executor-cores 2



Code snippet:

//Disable broadcast join
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


  val monitoring_stream = volume_df_filtered.writeStream
  .trigger(Trigger.ProcessingTime("120
seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId:
Long) =>
if(!batchDF.isEmpty)
{
monitoring_df =
monitoring_df.join(batchDF, monitoring_df("id") === batchDF("id"),
"left").select(monitoring_df("id"),
 monitoring_df("total_volume"), batchDF("volume")).na.fill(0) //This join
operation consumes the usercache continusly.
   monitoring_df =
monitoring_df.withColumn("total_volume",
monitoring_df("total_volume")+monitoring_df("volume"))

monitoring_df =
monitoring_df.repartition(6)
batchDF.unpersist()
spark.catalog.clearCache()
}


Spark DataFrame Creation

2020-07-22 Thread Mark Bidewell
Sorry if this is the wrong place for this.  I am trying to debug an issue
with this library:
https://github.com/springml/spark-sftp

When I attempt to create a dataframe:

spark.read.
format("com.springml.spark.sftp").
option("host", "...").
option("username", "...").
option("password", "...").
option("fileType", "csv").
option("inferSchema", "true").
option("tempLocation","/srv/spark/tmp").
option("hdfsTempLocation","/srv/spark/tmp");
 .load("...")

What I am seeing is that the download is occurring on the spark driver not
the spark worker,  This leads to a failure when spark tries to create the
DataFrame on the worker.

I'm confused by the behavior.  my understanding was that load() was lazily
executed on the Spark worker.  Why would some elements be executing on the
driver?

Thanks for your help
-- 
Mark Bidewell
http://www.linkedin.com/in/markbidewell


Re: Spark DataFrame Creation

2020-07-22 Thread Sean Owen
You'd probably do best to ask that project, but scanning the source
code, that looks like it's how it's meant to work. It downloads to a
temp file on the driver then copies to distributed storage then
returns a DataFrame for that. I can't see how it would be implemented
directly over sftp as there would be so many pieces missing -
locality, blocking, etc.

On Wed, Jul 22, 2020 at 4:48 PM Mark Bidewell  wrote:
>
> Sorry if this is the wrong place for this.  I am trying to debug an issue 
> with this library:
> https://github.com/springml/spark-sftp
>
> When I attempt to create a dataframe:
>
> spark.read.
> format("com.springml.spark.sftp").
> option("host", "...").
> option("username", "...").
> option("password", "...").
> option("fileType", "csv").
> option("inferSchema", "true").
> option("tempLocation","/srv/spark/tmp").
> option("hdfsTempLocation","/srv/spark/tmp");
>  .load("...")
>
> What I am seeing is that the download is occurring on the spark driver not 
> the spark worker,  This leads to a failure when spark tries to create the 
> DataFrame on the worker.
>
> I'm confused by the behavior.  my understanding was that load() was lazily 
> executed on the Spark worker.  Why would some elements be executing on the 
> driver?
>
> Thanks for your help
> --
> Mark Bidewell
> http://www.linkedin.com/in/markbidewell

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataFrame Creation

2020-07-22 Thread Andrew Melo
Hi Mark,

On Wed, Jul 22, 2020 at 4:49 PM Mark Bidewell  wrote:
>
> Sorry if this is the wrong place for this.  I am trying to debug an issue 
> with this library:
> https://github.com/springml/spark-sftp
>
> When I attempt to create a dataframe:
>
> spark.read.
> format("com.springml.spark.sftp").
> option("host", "...").
> option("username", "...").
> option("password", "...").
> option("fileType", "csv").
> option("inferSchema", "true").
> option("tempLocation","/srv/spark/tmp").
> option("hdfsTempLocation","/srv/spark/tmp");
>  .load("...")
>
> What I am seeing is that the download is occurring on the spark driver not 
> the spark worker,  This leads to a failure when spark tries to create the 
> DataFrame on the worker.
>
> I'm confused by the behavior.  my understanding was that load() was lazily 
> executed on the Spark worker.  Why would some elements be executing on the 
> driver?

Looking at the code, it appears that your sftp plugin downloads the
file to a local location and opens from there.

https://github.com/springml/spark-sftp/blob/090917547001574afa93cddaf2a022151a3f4260/src/main/scala/com/springml/spark/sftp/DefaultSource.scala#L38

You may have more luck with an sftp hadoop filesystem plugin that can
read sftp:// URLs directly.

Cheers
Andrew
>
> Thanks for your help
> --
> Mark Bidewell
> http://www.linkedin.com/in/markbidewell

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Job Fails with Unknown Error writing to S3 from AWS EMR

2020-07-22 Thread koti reddy
Hi,

Can someone help to resolve this issue?
Thank you in advance.

Error logs :

java.io.EOFException: Unexpected EOF while trying to read response from server
at 
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
at 
org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
20/07/22 22:43:37 WARN DataStreamer: Error Recovery for
BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1498 in
pipeline 
[DatanodeInfoWithStorage[172.19.222.182:50010,DS-7783002b-d57a-43a3-9d91-9934e2d063f8,DISK],
DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK],
DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK]]:
datanode 
0(DatanodeInfoWithStorage[172.19.222.182:50010,DS-7783002b-d57a-43a3-9d91-9934e2d063f8,DISK])
is bad.
20/07/22 22:44:58 WARN DataStreamer: Exception for
BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1499
java.io.EOFException: Unexpected EOF while trying to read response from server
at 
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
at 
org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
20/07/22 22:44:58 WARN DataStreamer: Error Recovery for
BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1499 in
pipeline 
[DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK],
DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK],
DatanodeInfoWithStorage[172.19.222.180:50010,DS-40b1c81b-18d1-4d8d-ab49-11904f3dd23c,DISK]]:
datanode 
0(DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK])
is bad.
20/07/22 22:47:00 WARN DataStreamer: Exception for
BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1500
java.io.EOFException: Unexpected EOF while trying to read response from server
at 
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
at 
org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
20/07/22 22:47:00 WARN DataStreamer: Error Recovery for
BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1500 in
pipeline 
[DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK],
DatanodeInfoWithStorage[172.19.222.180:50010,DS-40b1c81b-18d1-4d8d-ab49-11904f3dd23c,DISK],
DatanodeInfoWithStorage[172.19.223.55:50010,DS-3e5dc677-cd1d-49fc-b50a-4b058ae298aa,DISK]]:
datanode 
0(DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK])
is bad.
20/07/22 22:47:03 INFO MemoryStore: Block broadcast_4 stored as values
in memory (estimated size 18.0 GB, free 24.5 GB)

-- 
Thanks,
Koti Reddy Nusum,
+1-(660) 541-5623.


Re: Spark Job Fails with Unknown Error writing to S3 from AWS EMR

2020-07-22 Thread Shriraj Bhardwaj
We faced this similar situation with jre 8u262 try reverting back...

On Thu, Jul 23, 2020, 5:18 AM koti reddy  wrote:

> Hi,
>
> Can someone help to resolve this issue?
> Thank you in advance.
>
> Error logs :
>
> java.io.EOFException: Unexpected EOF while trying to read response from server
>   at 
> org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
>   at 
> org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
> 20/07/22 22:43:37 WARN DataStreamer: Error Recovery for 
> BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1498 in pipeline 
> [DatanodeInfoWithStorage[172.19.222.182:50010,DS-7783002b-d57a-43a3-9d91-9934e2d063f8,DISK],
>  
> DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK],
>  
> DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK]]:
>  datanode 
> 0(DatanodeInfoWithStorage[172.19.222.182:50010,DS-7783002b-d57a-43a3-9d91-9934e2d063f8,DISK])
>  is bad.
> 20/07/22 22:44:58 WARN DataStreamer: Exception for 
> BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1499
> java.io.EOFException: Unexpected EOF while trying to read response from server
>   at 
> org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
>   at 
> org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
> 20/07/22 22:44:58 WARN DataStreamer: Error Recovery for 
> BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1499 in pipeline 
> [DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK],
>  
> DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK],
>  
> DatanodeInfoWithStorage[172.19.222.180:50010,DS-40b1c81b-18d1-4d8d-ab49-11904f3dd23c,DISK]]:
>  datanode 
> 0(DatanodeInfoWithStorage[172.19.223.27:50010,DS-1bc8cea7-9c28-4869-aada-55b7d0b0680c,DISK])
>  is bad.
> 20/07/22 22:47:00 WARN DataStreamer: Exception for 
> BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1500
> java.io.EOFException: Unexpected EOF while trying to read response from server
>   at 
> org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
>   at 
> org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
> 20/07/22 22:47:00 WARN DataStreamer: Error Recovery for 
> BP-439833631-172.19.222.143-1595381416559:blk_1073742309_1500 in pipeline 
> [DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK],
>  
> DatanodeInfoWithStorage[172.19.222.180:50010,DS-40b1c81b-18d1-4d8d-ab49-11904f3dd23c,DISK],
>  
> DatanodeInfoWithStorage[172.19.223.55:50010,DS-3e5dc677-cd1d-49fc-b50a-4b058ae298aa,DISK]]:
>  datanode 
> 0(DatanodeInfoWithStorage[172.19.223.199:50010,DS-880d5121-16a2-465e-ad20-ca99f4287770,DISK])
>  is bad.
> 20/07/22 22:47:03 INFO MemoryStore: Block broadcast_4 stored as values in 
> memory (estimated size 18.0 GB, free 24.5 GB)
>
> --
> Thanks,
> Koti Reddy Nusum,
> +1-(660) 541-5623.
>
>