Re: Two different Hive instances running

2018-08-17 Thread Patrick Alwell
You probably need to take a look at your hive-site.xml and see what the 
location is for the Hive Metastore. As for beeline, you can explicitly use an 
instance of Hive server by passing in the JDBC url to the hiveServer when you 
launch the client; e.g. beeline –u “jdbc://example.com:5432”

Try taking a look at this 
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hive-metastore.html

There should be conf settings you can update to make sure you are using the 
same metastore as the instance of HiveServer.

Hive Wiki is a great resource as well ☺

From: Fabio Wada 
Date: Friday, August 17, 2018 at 11:22 AM
To: "user@spark.apache.org" 
Subject: Two different Hive instances running

Hi,

I am executing a insert into Hive table using SparkSession in Java. When I 
execute select via beeline, I don't see these inserted data. And when I insert 
data using beeline I don't see via my program using SparkSession.

It's looks like there are different Hive instances running.

How can I point to same Hive instance? Using SparkSession and beeline.

Thanks
[mage removed by sender.]ᐧ


Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Patrick Alwell
I don’t think sql context is “deprecated” in this sense. It’s still accessible 
by earlier versions of Spark.

But yes, at first glance it looks like you are correct. I don’t see a 
recordWriter method for parquet outside of the SQL package.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

Here is an example that uses Sql context.  I believe the SQL context  is 
necessary for strongly typed, self describing, binary, columnar formatted files 
like Parquet.
https://community.hortonworks.com/articles/72941/writing-parquet-on-hdfs-using-spark-streaming.html

Otherwise you’ll probably be looking at a customWriter.
https://parquet.apache.org/documentation/latest/

AFAIK,

If you were to implement a custom writer, you still wouldn’t escape the parquet 
formatting paradigm the DF API solves. Spark needs a way to map data types for 
Parquet conversion.

Hope this helps,

-Pat


On 2/28/18, 11:09 AM, "karthikus"  wrote:

Hi all,

I have a Kafka stream data and I need to save the data in parquet format
without using Structured Streaming (due to the lack of Kafka Message header
support). 

val kafkaStream =
  KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
  topics,
  kafkaParams
)
  )
// process the messages
val messages = kafkaStream.map(record => (record.key, record.value))
val lines = messages.map(_._2)

Now, how do I save it as parquet ? All the examples that I have come across
uses SQLContext which is deprecated. ! Any help appreciated ! 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Patrick Alwell
+1

AFAIK,

vCores are not the same as Cores in AWS. 
https://samrueby.com/2015/01/12/what-are-amazon-aws-vcpus/

I’ve always understood it as cores = num concurrent threads

These posts might help you with your research and why exceeding 5 cores per 
executor doesn’t make sense.

https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/

AWS/ EMR was always a challenge for me. Never understood why it didn’t seem to 
be using all my resources; as you noted.

I would see this as –num-executors = 15 –executor-cores= 5 –executor-memory = 
10gb and then test my application from there.

I only got better performance out of a different class of nodes, e.g. R-series 
instance types. Costs more than the M class; but wound up using less of them 
and my jobs ran faster. I was in the 10+TB jobs territory with TPC data.  ☺ The 
links I provided have a few use cases and trials.

Hope that helps,

-Pat


From: Selvam Raman 
Date: Monday, February 26, 2018 at 1:52 PM
To: Vadim Semenov 
Cc: user 
Subject: Re: Spark EMR executor-core vs Vcores

Thanks. That’s make sense.

I want to know one more think , available vcore per machine is 16 but threads 
per node 8. Am I missing to relate here.

What I m thinking now is number of vote = number of threads.



On Mon, 26 Feb 2018 at 18:45, Vadim Semenov 
> wrote:
All used cores aren't getting reported correctly in EMR, and YARN itself has no 
control over it, so whatever you put in `spark.executor.cores` will be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman 
> wrote:
Hi,

spark version - 2.0.0
spark distribution - EMR 5.0.0

Spark Cluster - one master, 5 slaves
Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage



Cluster Metrics
Apps Submitted

Apps Pending

Apps Running

Apps Completed

Containers Running

Memory Used

Memory Total

Memory Reserved

VCores Used

VCores Total

VCores Reserved

Active Nodes

Decommissioning Nodes

Decommissioned Nodes

Lost Nodes

Unhealthy Nodes

Rebooted Nodes

16

0

1

15

5

88.88 GB

90.50 GB

22 GB

5

79

1

5

0

0

5

0

0


I have submitted job with below configuration
--num-executors 5 --executor-cores 10 --executor-memory 20g







spark.task.cpus - be default 1


My understanding is there will be 5 executore each can run 10 task at a time 
and task can share total memory of 20g. Here, i could see only 5 vcores used 
which means 1 executor instance use 20g+10%overhead ram(22gb), 10 core(number 
of threads), 1 Vcore(cpu).

please correct me if my understand is wrong.


how can i utilize number of vcore in EMR effectively. Will Vcore boost 
performance?



--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark Dataframe and HIVE

2018-02-09 Thread Patrick Alwell
Might sound silly, but are you using a Hive context?
What errors do the Hive query results return?

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

The second part of your questions, you are creating a temp table and then 
subsequently creating another table from that temp view. Doesn’t seem like you 
are reading the table from the spark or hive warehouse.

This works fine for me; albeit I was using spark thrift to communicate with my 
directory of choice.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, types
from pyspark.sql.types import *
from pyspark.sql import functions as f
from decimal import *
from datetime import datetime

# instantiate our sparkSession and context
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# Generating customer orc table files
# load raw data as an RDD
customer_data = sc.textFile("/data/tpch/customer.tbl")
# map the data into an RDD split with pipe delimitations
customer_split = customer_data.map(lambda l: l.split("|"))
# map the split data with a row method; this is where we specificy column names 
and types
# default type is string- UTF8
# there are issues with converting string to date and these issues have been 
addressed
# in those tables with dates: See notes below
customer_row = customer_split.map( lambda r: Row(
custkey=long(r[0]),
name=r[1],
address=r[2],
nationkey=long(r[3]),
phone=r[4],
acctbal=Decimal(r[5]),
mktsegment=r[6],
comment=r[7]
))

# we can have Spark infer the schema, or apply a strict schema and identify 
whether or not we want null values
# in this case we don't want null values for keys; and we want explicit data 
types to support the TPCH tables/ data model
customer_schema = types.StructType([
   types.StructField('custkey',types.LongType(),False)
   ,types.StructField('name',types.StringType())
   ,types.StructField('address',types.StringType())
   ,types.StructField('nationkey',types.LongType(),False)
   ,types.StructField('phone',types.StringType())
   ,types.StructField('acctbal',types.DecimalType())
   ,types.StructField('mktsegment',types.StringType())
   ,types.StructField('comment',types.StringType())])

# we create a dataframe object by referencing our sparkSession class and the 
createDataFrame method
# this method takes two arguments by default (row, schema)
customer_df = spark.createDataFrame(customer_row,customer_schema)

# we can now write a file of type orc by referencing our dataframe object we 
created
customer_df.write.orc("/data/tpch/customer.orc")

# read that same file we created but create a seperate dataframe object
customer_df_orc = spark.read.orc("/data/tpch/customer.orc")

# reference the newly created dataframe object and create a tempView for QA 
purposes
customer_df_orc.createOrReplaceTempView("customer")

# reference the sparkSession class and SQL method in order to issue SQL 
statements to the materialized view
spark.sql("SELECT * FROM customer LIMIT 10").show()

From: "☼ R Nair (रविशंकर नायर)" 
Date: Friday, February 9, 2018 at 7:03 AM
To: "user @spark/'user @spark'/spark users/user@spark" 
Subject: Re: Spark Dataframe and HIVE

An update: (Sorry I missed)

When I do

passion_df.createOrReplaceTempView("sampleview")

spark.sql("create table sample table as select * from sample view")

Now, I can see table and can query as well.

So why this do work from Spark and other method discussed below is not?

Thanks



On Fri, Feb 9, 2018 at 9:49 AM, ☼ R Nair (रविशंकर नायर) 
> wrote:
All,

It has been three days continuously I am on this issue. Not getting any clue.

Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is in 
spark's conf.

1) Step 1: I created a data frame DF1 reading a csv file.

2) Did  manipulations on DF1. Resulting frame is passion_df.

3) passion_df.write.format("orc").saveAsTable("sampledb.passion")

4) The metastore shows the hive table., when I do "show tables" in HIVE, I can 
see table name

5) I can't select in HIVE, though I can select from SPARK as spark.sql("select 
* from sampledb.passion")

Whats going on here? Please help. Why I am not seeing data from HIVE prompt?
The "describe formatted " command on the table in HIVE shows he data is is in 
default warehouse location ( /user/hive/warehouse) since I set it.

I am not getting any definite answer anywhere. Many suggestions and answers 
given in Stackoverflow et al.Nothing really works.

So asking experts here for some light on this, thanks

Best,
Ravion





--
[mage removed by sender.]


Re: I can't save DataFrame from running Spark locally

2018-01-23 Thread Patrick Alwell
Spark cannot read locally from S3 without an S3a protocol; you’ll more than 
likely need a local copy of the data or you’ll need to utilize the proper jars 
to enable S3 communication from the edge to the datacenter.

https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark

Here are the jars: 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

Looks like you already have them, in which case you’ll have to make small 
configuration changes, e.g. s3 --> s3a

Keep in mind: The Amazon JARs have proven very brittle: the version of the 
Amazon libraries must match the versions against which the Hadoop binaries were 
built.

https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client




From: Toy 
Date: Tuesday, January 23, 2018 at 11:33 AM
To: "user@spark.apache.org" 
Subject: I can't save DataFrame from running Spark locally

Hi,

First of all, my Spark application runs fine in AWS EMR. However, I'm trying to 
run it locally to debug some issue. My application is just to parse log files 
and convert to DataFrame then convert to ORC and save to S3. However, when I 
run locally I get this error

java.io.IOException: /orc/dt=2018-01-23 doesn't exist
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at 
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)

Here's what I have in sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.1.0"
val hadoopVersion = "2.7.3"
val awsVersion = "1.11.155"

lazy val sparkAndDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,

  "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-common" % hadoopVersion
)

And this is where the code failed

val sparrowWriter = 
sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
sparrowWriter.save(sparrowOutputPath)

sparrowOutputPath is something like s3://bucket/folder and it exists I checked 
it with aws command line

I put a breakpoint there and the full path looks like this 
s3://bucket/orc/dt=2018-01-23 which exists.

I have also set up the credentials like this

sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")

My confusion is this code runs fine in the cluster but I get this error running 
locally.




Re: Spark on EMR suddenly stalling

2017-12-28 Thread Patrick Alwell
Joren,

Anytime there is a shuffle in the network, Spark moves to a new stage. It seems 
like you are having issues either pre or post shuffle. Have you looked at a 
resource management tool like ganglia to determine if this is a memory or 
thread related issue? The spark UI?

You are using groupByKey() have you thought of an alternative like 
aggregateByKey() or combineByKey() to reduce shuffling?
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re/avoid-groupbykey-when-performing-a-group-of-multiple-items-by-key.html

Dynamic allocation is great; but sometimes I’ve found explicitly setting the 
num executors, cores per executor, and memory per executor to be a better 
alternative.

Take a look at the yarn logs as well for the particular executor in question. 
Executors can have multiple tasks; and will often fail if they have more tasks 
than available threads.

As for partitioning the data; you could also look into your level of 
parallelism which is correlated to the splitablity (blocks) of data. This will 
be based on your largest RDD.
https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism

Spark is like C/C++ you need to manage the memory buffer or the compiler will 
through you out  ;)
https://spark.apache.org/docs/latest/hardware-provisioning.html

Hang in there, this is the more complicated stage of placing a spark 
application into production. The Yarn logs should point you in the right 
direction.

It’s tough to debug over email, so hopefully this information is helpful.

-Pat


On 12/28/17, 9:57 AM, "Jeroen Miller"  wrote:

On 28 Dec 2017, at 17:41, Richard Qiao  wrote:
> Are you able to specify which path of data filled up?

I can narrow it down to a bunch of files but it's not so straightforward.

> Any logs not rolled over?

I have to manually terminate the cluster but there is nothing more in the 
driver's log when I check it from the AWS console when the cluster is still 
running. 

JM


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark based Data Warehouse

2017-11-12 Thread Patrick Alwell
Alcon,

You can most certainly do this. I’ve done benchmarking with Spark SQL and the 
TPCDS queries using S3 as the filesystem.

Zeppelin and Livy server work well for the dash boarding and concurrent query 
issues:  https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/

Livy Server will allow you to create multiple spark contexts via REST: 
https://livy.incubator.apache.org/

If you are looking for broad SQL functionality I’d recommend instantiating a 
Hive context. And Spark is able to spill to disk --> 
https://spark.apache.org/faq.html

There are multiple companies running spark within their data warehouse 
solutions: 
https://ibmdatawarehousing.wordpress.com/2016/10/12/steinbach_dashdb_local_spark/

Edmunds used Spark to allow business analysts to point Spark to files in S3 and 
infer schema: https://www.youtube.com/watch?v=gsR1ljgZLq0

Recommend running some benchmarks and testing query scenarios for your end 
users; but it sounds like you’ll be using it for exploratory analysis. Spark is 
great for this ☺

-Pat


From: Vadim Semenov 
Date: Sunday, November 12, 2017 at 1:06 PM
To: Gourav Sengupta 
Cc: Phillip Henry , ashish rawat 
, Jörn Franke , Deepak Sharma 
, spark users 
Subject: Re: Spark based Data Warehouse

It's actually quite simple to answer

> 1. Is Spark SQL and UDF, able to handle all the workloads?
Yes

> 2. What user interface did you provide for data scientist, data engineers and 
> analysts
Home-grown platform, EMR, Zeppelin

> What are the challenges in running concurrent queries, by many users, over 
> Spark SQL? Considering Spark still does not provide spill to disk, in many 
> scenarios, are there frequent query failures when executing concurrent queries
You can run separate Spark Contexts, so jobs will be isolated

> Are there any open source implementations, which provide something similar?
Yes, many.


On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta 
> wrote:
Dear Ashish,
what you are asking for involves at least a few weeks of dedicated 
understanding of your used case and then it takes at least 3 to 4 months to 
even propose a solution. You can even build a fantastic data warehouse just 
using C++. The matter depends on lots of conditions. I just think that your 
approach and question needs a lot of modification.

Regards,
Gourav

On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
> wrote:
Hi, Ashish.
You are correct in saying that not *all* functionality of Spark is 
spill-to-disk but I am not sure how this pertains to a "concurrent user 
scenario". Each executor will run in its own JVM and is therefore isolated from 
others. That is, if the JVM of one user dies, this should not effect another 
user who is running their own jobs in their own JVMs. The amount of resources 
used by a user can be controlled by the resource manager.
AFAIK, you configure something like YARN to limit the number of cores and the 
amount of memory in the cluster a certain user or group is allowed to use for 
their job. This is obviously quite a coarse-grained approach as (to my 
knowledge) IO is not throttled. I believe people generally use something like 
Apache Ambari to keep an eye on network and disk usage to mitigate problems in 
a shared cluster.

If the user has badly designed their query, it may very well fail with OOMEs 
but this can happen irrespective of whether one user or many is using the 
cluster at a given moment in time.

Does this help?
Regards,
Phillip

On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat 
> wrote:
Thanks Jorn and Phillip. My question was specifically to anyone who have tried 
creating a system using spark SQL, as Data Warehouse. I was trying to check, if 
someone has tried it and they can help with the kind of workloads which worked 
and the ones, which have problems.

Regarding spill to disk, I might be wrong but not all functionality of spark is 
spill to disk. So it still doesn't provide DB like reliability in execution. In 
case of DBs, queries get slow but they don't fail or go out of memory, 
specifically in concurrent user scenarios.

Regards,
Ashish

On Nov 12, 2017 3:02 PM, "Phillip Henry" 
> wrote:
Agree with Jorn. The answer is: it depends.

In the past, I've worked with data scientists who are happy to use the Spark 
CLI. Again, the answer is "it depends" (in this case, on the skills of your 
customers).
Regarding sharing resources, different teams were limited to their own queue so 
they could not hog all the resources. However, people within a team had to do 
some horse trading if they had a particularly intensive job to run. I did feel 
that this was an 

Re: CSV write to S3 failing silently with partial completion

2017-09-07 Thread Patrick Alwell
Sounds like an S3 bug. Can you replicate locally with HDFS?

Try using S3a protocol too; there is a jar you can leverage like so: 
spark-submit --packages 
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 
my_spark_program.py

EMR can sometimes be buggy. :/

You could also try leveraging EC2 nodes and manually creating a cluster with 
password less SSH.

But I feel your pain man, I’ve had weird issues with Redshift and EMR as well.

Let me know if you can or can’t replicate locally; and I can bring it up with 
our S3 team for the next release of HDP and we can file a bug with AWS.

-Pat

On 9/7/17, 2:59 AM, "JG Perrin"  wrote:

Are you assuming that all partitions are of equal size? Did you try with 
more partitions (like repartitioning)? Does the error always happen with the 
last (or smaller) file? If you are sending to redshift, why not use the JDBC 
driver?

-Original Message-
From: abbim [mailto:ab...@amazon.com] 
Sent: Thursday, September 07, 2017 1:02 AM
To: user@spark.apache.org
Subject: CSV write to S3 failing silently with partial completion

Hi all,
My team has been experiencing a recurring unpredictable bug where only a 
partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.

This becomes problematic in the following ways:
1. When we copy the data to Redshift, we get a bad decrypt error on the 
partial file, suggesting that the failure occurred at a weird byte in the file. 
2. We lose data - sometimes as much as 10%.

We don't see this problem with parquet format, which we also use, but 
moving all of our data to parquet is not currently feasible. We're using the 
Java API with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:
df.write().csv("s3://some-bucket/some_location"). We're experiencing the 
issue 1-3x/week on a daily job and are unable to reliably reproduce the 
problem. 

Any thoughts on why we might be seeing this and how to resolve?
Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





Re: How to authenticate to ADLS from within spark job on the fly

2017-08-19 Thread Patrick Alwell
This might help; I’ve built a REST API with livyServer: 
https://livy.incubator.apache.org/



From: Steve Loughran 
Date: Saturday, August 19, 2017 at 7:05 AM
To: Imtiaz Ahmed 
Cc: "user@spark.apache.org" 
Subject: Re: How to authenticate to ADLS from within spark job on the fly


On 19 Aug 2017, at 02:42, Imtiaz Ahmed 
> wrote:

Hi All,
I am building a spark library which developers will use when writing their 
spark jobs to get access to data on Azure Data Lake. But the authentication 
will depend on the dataset they ask for. I need to call a rest API from within 
spark job to get credentials and authenticate to read data from ADLS. Is that 
even possible? I am new to spark.
E.g, from inside a spark job a user will say:

MyCredentials myCredentials = MyLibrary.getCredentialsForPath(userId, 
"/some/path/on/azure/datalake");

then before spark.read.json("adl://examples/src/main/resources/people.json")
I need to authenticate the user to be able to read that path using the 
credentials fetched above.

Any help is appreciated.

Thanks,
Imtiaz

The ADL filesystem supports addDelegationTokens(); allowing the caller to 
collect the delegation tokens of the current authenticated user & then pass it 
along with the request —which is exactly what spark should be doing in spark 
submit.

if you want to do it yourself, look in SparkHadoopUtils (I think; IDE is closed 
right now) & see how the tokens are picked up and then passed around 
(marshalled over the job request, unmarshalled after & picked up, with bits of 
the UserGroupInformation class doing the low level work)

Java code snippet to write to the path tokenFile:

FileSystem fs = FileSystem.get(conf);
Credentials cred = new Credentials();
Token tokens[] = fs.addDelegationTokens(renewer, cred);
cred.writeTokenStorageFile(tokenFile, conf);

you can then read that file in elsewhere, and then (somehow) get the FS to use 
those toakens

otherwise, ADL supports Oauth, so you may be able to use any Oauth libraries 
for this. hadoop-azure-dalalake pulls in okhttp for that,

 
  com.squareup.okhttp
  okhttp
  2.4.0


-Steve



Re: GC overhead exceeded

2017-08-18 Thread Patrick Alwell
+1 what is the executor memory? You may need to adjust executor memory and 
cores. For the sake of simplicity; each executor can handle 5 concurrent tasks 
and should have 5 cores. So if your cluster has 100 cores, you’d have 20 
executors. And if your cluster memory is 500gb, each executor would have  25gb 
of memory.

What’s more, you can use tools like the Spark UI or Ganglia to determine which 
step is failing and why. What is the overall cluster size? How many executors 
do you have? Is it an appropriate count for this cluster’s cores? I’m assuming 
you are using YARN?

-Pat

From: KhajaAsmath Mohammed 
Date: Friday, August 18, 2017 at 5:30 AM
To: Pralabh Kumar 
Cc: "user @spark" 
Subject: Re: GC overhead exceeded

It is just a sql from hive table with transformation if adding 10 more columns 
calculated for currency. Input size for this query is 2 months which has around 
450gb data.

I added persist but it didn't help. Also the executor memory is 8g . Any 
suggestions please ?

Sent from my iPhone

On Aug 17, 2017, at 11:43 PM, Pralabh Kumar 
> wrote:
what's is your exector memory , please share the code also

On Fri, Aug 18, 2017 at 10:06 AM, KhajaAsmath Mohammed 
> wrote:

HI,

I am getting below error when running spark sql jobs. This error is thrown 
after running 80% of tasks. any solution?

spark.storage.memoryFraction=0.4
spark.sql.shuffle.partitions=2000
spark.default.parallelism=100
#spark.eventLog.enabled=false
#spark.scheduler.revive.interval=1s
spark.driver.memory=8g


java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.ArrayList.subList(ArrayList.java:955)
at java.lang.String.split(String.java:2311)
at 
sun.net.util.IPAddressUtil.textToNumericFormatV4(IPAddressUtil.java:47)
at java.net.InetAddress.getAllByName(InetAddress.java:1129)
at java.net.InetAddress.getAllByName(InetAddress.java:1098)
at java.net.InetAddress.getByName(InetAddress.java:1048)
at 
org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:562)
at 
org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:579)
at 
org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
at 
org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
at 
org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
at 
org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
at 
org.apache.spark.scheduler.TaskSetManager.dequeueTask(TaskSetManager.scala:380)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:433)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:271)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:352)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)