Task - Id : Staus Failed

2019-06-06 Thread dimitris plakas
Hello Everyone,

I am trying to set up a yarn cluster with three nodes (one master and two
workers).
I followed this tutorial :
https://linode.com/docs/databases/hadoop/how-to-install-and-set-up-hadoop-cluster/


I also try to execute the yarn exmaple at the end of this tutorial with the
wordcount. After executing the hadoop-mapreduce-examples-2.8.5.jar i get
STATUS: FAILED for every Task Id although that the example finished without
any error.

The failed status means that an error occured on yarn job execution? If so
could you explain me what exactly is this error?

In the attachement you will find the output that i get to my screen.

Thank you in advance,
Dimitris Plakas
19/06/06 23:46:20 INFO client.RMProxy: Connecting to ResourceManager at 
node-master/192.168.0.1:8032
19/06/06 23:46:22 INFO input.FileInputFormat: Total input files to process : 3
19/06/06 23:46:23 INFO mapreduce.JobSubmitter: number of splits:3
19/06/06 23:46:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: 
job_1559847675487_0010
19/06/06 23:46:24 INFO impl.YarnClientImpl: Submitted application 
application_1559847675487_0010
19/06/06 23:46:24 INFO mapreduce.Job: The url to track the job: 
http://node-master:8088/proxy/application_1559847675487_0010/
19/06/06 23:46:24 INFO mapreduce.Job: Running job: job_1559847675487_0010
19/06/06 23:46:38 INFO mapreduce.Job: Job job_1559847675487_0010 running in 
uber mode : false
19/06/06 23:46:38 INFO mapreduce.Job:  map 0% reduce 0%
19/06/06 23:46:48 INFO mapreduce.Job:  map 33% reduce 0%
19/06/06 23:46:55 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_m_02_0, Status : FAILED
19/06/06 23:46:55 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_m_01_0, Status : FAILED
19/06/06 23:47:04 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_r_00_0, Status : FAILED
19/06/06 23:47:05 INFO mapreduce.Job:  map 67% reduce 0%
19/06/06 23:47:11 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_m_01_1, Status : FAILED
19/06/06 23:47:21 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_r_00_1, Status : FAILED
19/06/06 23:47:25 INFO mapreduce.Job: Task Id : 
attempt_1559847675487_0010_m_01_2, Status : FAILED
19/06/06 23:47:44 INFO mapreduce.Job:  map 67% reduce 22%
19/06/06 23:47:45 INFO mapreduce.Job:  map 100% reduce 100%
19/06/06 23:47:46 INFO mapreduce.Job: Job job_1559847675487_0010 failed with 
state FAILED due to: Task failed task_1559847675487_0010_m_01
Job failed as tasks failed. failedMaps:1 failedReduces:0

19/06/06 23:47:46 INFO mapreduce.Job: Counters: 42
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=1078223
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=394942
HDFS: Number of bytes written=0
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Failed map tasks=5
Failed reduce tasks=2
Killed map tasks=1
Killed reduce tasks=1
Launched map tasks=7
Launched reduce tasks=3
Other local map tasks=4
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=359388
Total time spent by all reduces in occupied slots (ms)=191720
Total time spent by all map tasks (ms)=89847
Total time spent by all reduce tasks (ms)=47930
Total vcore-milliseconds taken by all map tasks=89847
Total vcore-milliseconds taken by all reduce tasks=47930
Total megabyte-milliseconds taken by all map tasks=46001664
Total megabyte-milliseconds taken by all reduce tasks=24540160
Map-Reduce Framework
Map input records=2989
Map output records=7432
Map output bytes=746802
Map output materialized bytes=762467
Input split bytes=240
Combine input records=7432
Combine output records=7283
Spilled Records=7283
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=400
CPU time spent (ms)=5430
Physical memory (bytes) snapshot=554012672
Virtual memory (bytes) snapshot=3930853376
Total committed heap usage (bytes)=408944640
File Input Format Counters
Bytes Read=394702

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark on Kubernetes Authentication error

2019-06-06 Thread Nick Dawes
Hi there,

I'm trying to run Spark on EKS. Created an EKS cluster, added nodes and
then trying to submit a Spark job from an EC2 instance.

Ran following commands for access. kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=admin
--serviceaccount=default:spark --namespace=default

spark-submit command used:

bin/spark-submit \ --master k8s://
https://XX.us-east-1.eks.amazonaws.com
 \
--deploy-mode cluster \ --name spark-pi \ --class
org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=2 \
--conf spark.app.name=spark-pi \ --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf
spark.kubernetes.container.image=k8sspark:latest \ --conf
spark.kubernetes.authenticate.submission.caCertFile=ca.pem \
local:usr/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar
10

log4j:WARN No appenders could be found for logger
(io.fabric8.kubernetes.client.Config). log4j:WARN Please initialize the
log4j system properly. log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using
Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/06 16:03:50 WARN WatchConnectionManager: Executor didn't terminate in
time after shutdown in close(), killing it in:
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5b43fbf6
Exception in thread "main"
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
POST at:
https://X.us-east-1.eks.amazonaws.com/api/v1/namespaces/default/pods
.
Message: pods is forbidden: User "system:anonymous" cannot create resource
"pods" in API group "" in the namespace "default". Received status:
Status(apiVersion=v1, code=403, details=StatusDetails(causes=[],
group=null, kind=pods, name=null, retryAfterSeconds=null, uid=null,
additionalProperties={}), kind=Status, message=pods is forbidden: User
"system:anonymous" cannot create resource "pods" in API group "" in the
namespace "default", metadata=ListMeta(_continue=null,
resourceVersion=null, selfLink=null, additionalProperties={}),
reason=Forbidden, status=Failure, additionalProperties={}). at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:478)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:417)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:381)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:344)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:227)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:787)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:357)
at
org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:141)
at
org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:140)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at
org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:140)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:250)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:241)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:241)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:204)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/06/06
16:03:50 INFO ShutdownHookManager: Shutdown hook called 19/06/06 16:03:50
INFO ShutdownHookManager: Deleting directory
/tmp/spark-0060fe01-33eb-4cb4-b96b-d5be687016bc

Tried creating different clusterrole with admin privilege. But it did not
work.

Any idea how to fix this one? Thanks.


- Nick


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Bruno Nassivet
Hi Marcelo,

Maybe the spark.sql.functions.explode give what you need?

// Bruno


> Le 6 juin 2019 à 16:02, Marcelo Valle  a écrit :
> 
> Generating the city id (child) is easy, monotonically increasing id worked 
> for me. 
> 
> The problem is the country (parent) which has to be in both countries and 
> cities data frames.
> 
> 
> 
> On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  > wrote:
> Well, you could do a repartition on cityname/nrOfCities and use the 
> spark_partition_id function or the mappartitionswithindex dataframe method to 
> add a city Id column. Then just split the dataframe into two subsets. Be 
> careful of hashcollisions on the reparition Key though, or more than one city 
> might end up in the same partition (you can use a custom partitioner).
> 
> It all depends on what kind of Id you want/need for the city value. I.e. will 
> you later need to append new city Id:s or not. Do you always handle the 
> entire dataset when you make this change or not.
> 
> On the other hand, getting a distinct list of citynames is a non shuffling 
> fast operation, add a row_number column and do a broadcast join with the 
> original dataset and then split into two subsets. Probably a bit faster than 
> reshuffling the entire dataframe. As always the proof is in the pudding.
> 
> //Magnus
> 
> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle  > wrote:
> Akshay, 
> 
> First of all, thanks for the answer. I *am* using monotonically increasing 
> id, but that's not my problem. 
> My problem is I want to output 2 tables from 1 data frame, 1 parent table 
> with ID for the group by and 1 child table with the parent id without the 
> group by.
> 
> I was able to solve this problem by grouping by, generating a parent data 
> frame with an id, then joining the parent dataframe with the original one to 
> get a child dataframe with a parent id. 
> 
> I would like to find a solution without this second join, though.
> 
> Thanks,
> Marcelo.
> 
> 
> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj  > wrote:
> Hi Marcelo,
> 
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this inbuilt 
> function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to achieve 
> this without having to write code or define RDDs with map/reduce functions.
> 
> Akshay Bhardwaj
> +91-97111-33849
> 
> 
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle  > wrote:
> Hi all, 
> 
> I am new to spark and I am trying to write an application using dataframes 
> that normalize data. 
> 
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY, CITY, 
> CITY_NICKNAME
> 
> Here is what I want to do: 
> 
> Map by country, then for each country generate a new ID and write to a new 
> dataframe `countries`, which would have COUNTRY_ID, COUNTRY - country ID 
> would be generated, probably using `monotonically_increasing_id`.
> For each country, write several lines on a new dataframe `cities`, which 
> would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be the same 
> generated on country table and ID would be another ID I generate. 
> What's the best way to do this, hopefully using only dataframes (no low level 
> RDDs) unless it's not possible?
> 
> I clearly see a MAP/Reduce process where for each KEY mapped I generate a row 
> in countries table with COUNTRY_ID and for every value I write a row in 
> cities table. But how to implement this in an easy and efficient way? 
> 
> I thought about using a `GroupBy Country` and then using `collect` to collect 
> all values for that country, but then I don't know how to generate the 
> country id and I am not sure about memory efficiency of `collect` for a 
> country with too many cities (bare in mind country/city is just an example, 
> my real entities are different).
> 
> Could anyone point me to the direction of a good solution?
> 
> Thanks,
> Marcelo.
> 
> This email is confidential [and may be protected by legal privilege]. If you 
> are not the intended recipient, please do not copy or disclose its content 
> but contact the sender immediately upon receipt.
> 
> KTech Services Ltd is registered in England as company number 10704940.
> 
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
> Kingdom
> 
> 
> This email is confidential [and may be protected by legal privilege]. If you 
> are not the intended recipient, please do not copy or disclose its content 
> but contact the sender immediately upon receipt.
> 
> KTech Services Ltd is registered in England as company number 10704940.
> 
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
> Kingdom
> 
> 
> This email is confidential [and may be protected by legal privilege]. If you 
> are not the intended recipient, please do not copy or disclose its content 
> but contact the sender imme

Fwd: [Spark SQL Thrift Server] Persistence errors with PostgreSQL and MySQL in 2.4.3

2019-06-06 Thread Ricardo Martinelli de Oliveira
Hello,

I'm running Thrift server with PostgresSQL persistence for hive metastore.
I'm using Postgres 9.6 and spark 2.4.3 in this environment.

When I start Thrift server I get lots of errors while creating the schema
and it happen everytime I reach postgres, like:

19/06/06 15:51:59 WARN Datastore: Error initialising derby schema : ERROR:
syntax error at or near "NAME"
  Position: 67
org.postgresql.util.PSQLException: ERROR: syntax error at or near "NAME"
  Position: 67

org.datanucleus.exceptions.NucleusException: JDBC type LONGVARCHAR declared
for field "org.apache.hadoop.hive.metastore.model.MTable.viewExpandedText"
of java type java.lang.String cant be mapped for this datastore.

And then I get many times this other error:

Failed to generate new Mapping of type
org.datanucleus.store.rdbms.mapping.datastore.CharRDBMSMapping, exception :
String max length of 1 is outside the acceptable range [0, 0] for column
""IS_COMPRESSED""
String max length of 1 is outside the acceptable range [0, 0] for column
""IS_COMPRESSED""

My suspicion is some  version mismatch between spark persistence libraries,
the JDBC Driver version(42.2.5)  and postgres, but the versions are the
same as in spark 2.2.3 (where I have a working environment).

Anyone have any ideas about what can cause these issues?

-- 

Ricardo Martinelli De Oliveira




-- 

Ricardo Martinelli De Oliveira

Data Engineer, AI CoE

Red Hat Brazil 

Av. Brigadeiro Faria Lima, 3900

8th floor

rmart...@redhat.comT: +551135426125
M: +5511970696531
@redhatjobs    redhatjobs
 @redhatjobs




Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Hi Magnus, Thanks for replying.

I didn't get the partition solution, tbh, but indeed, I was trying to
figure a way of solving only with data frames without rejoining.

I can't have a global list of countries in my real scenario, as the real
scenario is not reference data, countries was just an example.

It would be nice if dataframe api could evolve in the future to handle this
use case. This doesn't seem to be an isolated case, at least in the company
I work for.

Anyway, the question is answered, I wanted to hear that there was no better
way of doing it using data frames only from someone more experienced in
Spark - I am still in the beginning of my journey with it - thanks for the
help!

Thanks,
Marcelo.


On Thu, 6 Jun 2019 at 15:18, Magnus Nilsson  wrote:

> Sorry, misread. Just change city to country in my previous answer. I still
> believe the most efficient way to do this is to create a small dataset with
> country name and id columns and broadcast to join in the city dataset. Other
> than that you have to bring in all the cities from the same country into
> the same partition and do a partition based operation.
>
> If you want to do neither you need to be able to determine the Country ID
> from the string representation of the name, make it deterministic, and make
> sure your algorithm doesn't collide.
>
> I don't see any other way to do it. I still vote for doing a distinct
> collect, add an Id, do a  broadcast join unless your dataset is
> pre-bucketed along non-colliding Country name lines, then the
> partition-based solution is probably faster. Or better yet, pre-create a
> list of all the worlds countries with an Id and do a broadcast join
> straight away.
>
>
> Regards,
>
> Magnus
> --
> *From:* Marcelo Valle 
> *Sent:* Thursday, June 6, 2019 16:02
> *To:* Magnus Nilsson
> *Cc:* user @spark
> *Subject:* Re: adding a column to a groupBy (dataframe)
>
> Generating the city id (child) is easy, monotonically increasing id worked
> for me.
>
> The problem is the country (parent) which has to be in both countries and
> cities data frames.
>
>
>
> On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:
>
> Well, you could do a repartition on cityname/nrOfCities and use the
> spark_partition_id function or the mappartitionswithindex dataframe method
> to add a city Id column. Then just split the dataframe into two subsets. Be
> careful of hashcollisions on the reparition Key though, or more than one
> city might end up in the same partition (you can use a custom partitioner).
>
> It all depends on what kind of Id you want/need for the city value. I.e.
> will you later need to append new city Id:s or not. Do you always handle
> the entire dataset when you make this change or not.
>
> On the other hand, getting a distinct list of citynames is a non shuffling
> fast operation, add a row_number column and do a broadcast join with the
> original dataset and then split into two subsets. Probably a bit faster
> than reshuffling the entire dataframe. As always the proof is in the
> pudding.
>
> //Magnus
>
> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
> wrote:
>
> Akshay,
>
> First of all, thanks for the answer. I *am* using monotonically increasing
> id, but that's not my problem.
> My problem is I want to output 2 tables from 1 data frame, 1 parent table
> with ID for the group by and 1 child table with the parent id without the
> group by.
>
> I was able to solve this problem by grouping by, generating a parent data
> frame with an id, then joining the parent dataframe with the original one
> to get a child dataframe with a parent id.
>
> I would like to find a solution without this second join, though.
>
> Thanks,
> Marcelo.
>
>
> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
> Hi all,
>
> I am new to spark and I am trying to write an application using dataframes
> that normalize data.
>
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
> CITY, CITY_NICKNAME
>
> Here is what I want to do:
>
>
>1. Map by country, then for each country generate a new ID and write
>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>country ID would be generated, probably using 
> `monotonically_increasing_id`.
>2. For each country, write several lines on a new dataframe `cities`,
>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>the same generated on country table and ID would be another ID I generate.
>
> What's the best way to do this, hopefully using only data

Multi-dimensional aggregations in Structured Streaming

2019-06-06 Thread Symeon Meichanetzoglou
Hi all,

We are facing a challenge where a simple use case seems not trivial to
implement in structured streaming: an aggregation should be calculated
and then some other aggregations should further aggregate on the first
aggregation. Something like:
1st aggregation: val df = dfIn.groupBy(a,b,c,d).agg(sum(size))
2nd aggregation: df.groupBy(a,b,c).agg(sum(size))
3rd aggregation: df.groupBy(a,b,d).agg(sum(size))

My initial idea was to do it as below but it is wrong because you end
up overwriting rows in each mini batch since the state is not kept:

// df is the output of the first aggregation
df.writeStream.outputMode(OutputMode.Update).foreachBatch{ (batchDf:
DataFrame, batchId: Long) =>
  write(batchDf)
  // furtherAggregates contains the 2nd and 3rd aggregation
  furtherAggregates.foreach(agg => write(agg.compute(batchDf)))
}

It doesn't seem feasible to achieve the goal using foreachBatch. I see
two possible solutions:
1. Run 3 independent queries in parallel, all reading the same input.
This is very inefficient as the first aggregation contains some
expensive processing (e.g. lookup on a broadcast variable and update
of a column).
2. Calculate the first aggregation and write it to Kafka. Then run the
2 further aggregations independently, reading the Kafka topic that was
written by the first aggregation. The problem with this is that the
first aggregation writes in update mode (we cannot use append because
we cannot wait for the watermark to expire) so in Kafka we will end up
with many updates for a single aggregated row. Is this a
responsibility of the later aggregations to resolve? Maybe by keeping
only the record with the latest timestamp in the mini-batch (for a set
of aggregated columns)?

To me solution 2 sounds as the way to go if the issue that I mentioned
can be resolved. What do you think?

Thanks!

Symeon

PS: I found a similar question here:
https://stackoverflow.com/questions/41011002/multiple-aggregations-in-spark-structured-streaming

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Generating the city id (child) is easy, monotonically increasing id worked
for me.

The problem is the country (parent) which has to be in both countries and
cities data frames.



On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:

> Well, you could do a repartition on cityname/nrOfCities and use the
> spark_partition_id function or the mappartitionswithindex dataframe method
> to add a city Id column. Then just split the dataframe into two subsets. Be
> careful of hashcollisions on the reparition Key though, or more than one
> city might end up in the same partition (you can use a custom partitioner).
>
> It all depends on what kind of Id you want/need for the city value. I.e.
> will you later need to append new city Id:s or not. Do you always handle
> the entire dataset when you make this change or not.
>
> On the other hand, getting a distinct list of citynames is a non shuffling
> fast operation, add a row_number column and do a broadcast join with the
> original dataset and then split into two subsets. Probably a bit faster
> than reshuffling the entire dataframe. As always the proof is in the
> pudding.
>
> //Magnus
>
> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
> wrote:
>
>> Akshay,
>>
>> First of all, thanks for the answer. I *am* using monotonically
>> increasing id, but that's not my problem.
>> My problem is I want to output 2 tables from 1 data frame, 1 parent table
>> with ID for the group by and 1 child table with the parent id without the
>> group by.
>>
>> I was able to solve this problem by grouping by, generating a parent data
>> frame with an id, then joining the parent dataframe with the original one
>> to get a child dataframe with a parent id.
>>
>> I would like to find a solution without this second join, though.
>>
>> Thanks,
>> Marcelo.
>>
>>
>> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Marcelo,
>>>
>>> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
>>> inbuilt function "monotonically_increasing_id" in Spark.
>>> A little tweaking using Spark sql inbuilt functions can enable you to
>>> achieve this without having to write code or define RDDs with map/reduce
>>> functions.
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
>>> wrote:
>>>
 Hi all,

 I am new to spark and I am trying to write an application using
 dataframes that normalize data.

 So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
 CITY, CITY_NICKNAME

 Here is what I want to do:


1. Map by country, then for each country generate a new ID and
write to a new dataframe `countries`, which would have COUNTRY_ID, 
 COUNTRY
- country ID would be generated, probably using
`monotonically_increasing_id`.
2. For each country, write several lines on a new dataframe
`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
 COUNTRY_ID
would be the same generated on country table and ID would be another ID 
 I
generate.

 What's the best way to do this, hopefully using only dataframes (no low
 level RDDs) unless it's not possible?

 I clearly see a MAP/Reduce process where for each KEY mapped I generate
 a row in countries table with COUNTRY_ID and for every value I write a row
 in cities table. But how to implement this in an easy and efficient way?

 I thought about using a `GroupBy Country` and then using `collect` to
 collect all values for that country, but then I don't know how to generate
 the country id and I am not sure about memory efficiency of `collect` for a
 country with too many cities (bare in mind country/city is just an example,
 my real entities are different).

 Could anyone point me to the direction of a good solution?

 Thanks,
 Marcelo.

 This email is confidential [and may be protected by legal privilege].
 If you are not the intended recipient, please do not copy or disclose its
 content but contact the sender immediately upon receipt.

 KTech Services Ltd is registered in England as company number 10704940.

 Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
 United Kingdom

>>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in E

Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Magnus Nilsson
Well, you could do a repartition on cityname/nrOfCities and use the
spark_partition_id function or the mappartitionswithindex dataframe method
to add a city Id column. Then just split the dataframe into two subsets. Be
careful of hashcollisions on the reparition Key though, or more than one
city might end up in the same partition (you can use a custom partitioner).

It all depends on what kind of Id you want/need for the city value. I.e.
will you later need to append new city Id:s or not. Do you always handle
the entire dataset when you make this change or not.

On the other hand, getting a distinct list of citynames is a non shuffling
fast operation, add a row_number column and do a broadcast join with the
original dataset and then split into two subsets. Probably a bit faster
than reshuffling the entire dataframe. As always the proof is in the
pudding.

//Magnus

On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
wrote:

> Akshay,
>
> First of all, thanks for the answer. I *am* using monotonically increasing
> id, but that's not my problem.
> My problem is I want to output 2 tables from 1 data frame, 1 parent table
> with ID for the group by and 1 child table with the parent id without the
> group by.
>
> I was able to solve this problem by grouping by, generating a parent data
> frame with an id, then joining the parent dataframe with the original one
> to get a child dataframe with a parent id.
>
> I would like to find a solution without this second join, though.
>
> Thanks,
> Marcelo.
>
>
> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Marcelo,
>>
>> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
>> inbuilt function "monotonically_increasing_id" in Spark.
>> A little tweaking using Spark sql inbuilt functions can enable you to
>> achieve this without having to write code or define RDDs with map/reduce
>> functions.
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am new to spark and I am trying to write an application using
>>> dataframes that normalize data.
>>>
>>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>>> CITY, CITY_NICKNAME
>>>
>>> Here is what I want to do:
>>>
>>>
>>>1. Map by country, then for each country generate a new ID and write
>>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>>country ID would be generated, probably using 
>>> `monotonically_increasing_id`.
>>>2. For each country, write several lines on a new dataframe
>>>`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
>>> COUNTRY_ID
>>>would be the same generated on country table and ID would be another ID I
>>>generate.
>>>
>>> What's the best way to do this, hopefully using only dataframes (no low
>>> level RDDs) unless it's not possible?
>>>
>>> I clearly see a MAP/Reduce process where for each KEY mapped I generate
>>> a row in countries table with COUNTRY_ID and for every value I write a row
>>> in cities table. But how to implement this in an easy and efficient way?
>>>
>>> I thought about using a `GroupBy Country` and then using `collect` to
>>> collect all values for that country, but then I don't know how to generate
>>> the country id and I am not sure about memory efficiency of `collect` for a
>>> country with too many cities (bare in mind country/city is just an example,
>>> my real entities are different).
>>>
>>> Could anyone point me to the direction of a good solution?
>>>
>>> Thanks,
>>> Marcelo.
>>>
>>> This email is confidential [and may be protected by legal privilege]. If
>>> you are not the intended recipient, please do not copy or disclose its
>>> content but contact the sender immediately upon receipt.
>>>
>>> KTech Services Ltd is registered in England as company number 10704940.
>>>
>>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>>> United Kingdom
>>>
>>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: Spark on K8S - --packages not working for cluster mode?

2019-06-06 Thread pacuna
Great!

Thanks a lot.

Best,

Pablo.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Akshay,

First of all, thanks for the answer. I *am* using monotonically increasing
id, but that's not my problem.
My problem is I want to output 2 tables from 1 data frame, 1 parent table
with ID for the group by and 1 child table with the parent id without the
group by.

I was able to solve this problem by grouping by, generating a parent data
frame with an id, then joining the parent dataframe with the original one
to get a child dataframe with a parent id.

I would like to find a solution without this second join, though.

Thanks,
Marcelo.


On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj 
wrote:

> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
>> Hi all,
>>
>> I am new to spark and I am trying to write an application using
>> dataframes that normalize data.
>>
>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>> CITY, CITY_NICKNAME
>>
>> Here is what I want to do:
>>
>>
>>1. Map by country, then for each country generate a new ID and write
>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>country ID would be generated, probably using 
>> `monotonically_increasing_id`.
>>2. For each country, write several lines on a new dataframe `cities`,
>>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>>the same generated on country table and ID would be another ID I generate.
>>
>> What's the best way to do this, hopefully using only dataframes (no low
>> level RDDs) unless it's not possible?
>>
>> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
>> row in countries table with COUNTRY_ID and for every value I write a row in
>> cities table. But how to implement this in an easy and efficient way?
>>
>> I thought about using a `GroupBy Country` and then using `collect` to
>> collect all values for that country, but then I don't know how to generate
>> the country id and I am not sure about memory efficiency of `collect` for a
>> country with too many cities (bare in mind country/city is just an example,
>> my real entities are different).
>>
>> Could anyone point me to the direction of a good solution?
>>
>> Thanks,
>> Marcelo.
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


[no subject]

2019-06-06 Thread Shi Tyshanchn



Re: Spark on K8S - --packages not working for cluster mode?

2019-06-06 Thread Stavros Kontopoulos
Hi,

This has been fixed here: https://github.com/apache/spark/pull/23546. Will
be available with Spark 3.0.0

Best,
Stavros

On Wed, Jun 5, 2019 at 11:18 PM pacuna  wrote:

> I'm trying to run a sample code that reads a file from s3 so I need the aws
> sdk and aws hadoop dependencies.
> If I assemble these deps into the main jar everything works fine. But when
> I
> try using --packages, the deps are not seen by the pods.
>
> This is my submit command:
>
> spark-submit
> --master k8s://https://xx.xx.xx.xx
> --class "SimpleApp"
> --deploy-mode cluster
> --conf spark.kubernetes.container.image=docker.io/pacuna/spark:0.2
> --conf
> spark.kubernetes.authenticate.driver.serviceAccountName=spark-test-user
> --packages
> com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
> --conf spark.hadoop.fs.s3a.access.key=...
> --conf spark.hadoop.fs.s3a.secret.key=...
> https://x/simple-project_2.11-1.0.jar
>
> And the error I'm getting in the driver pod is:
>
> 19/06/05 20:13:50 ERROR SparkContext: Failed to add
>
> file:///home/dev/.ivy2/jars/com.fasterxml.jackson.core_jackson-core-2.2.3.jar
> to Spark environment
> java.io.FileNotFoundException: Jar
> /home/dev/.ivy2/jars/com.fasterxml.jackson.core_jackson-core-2.2.3.jar not
> found
>
> I'm getting that error for all the deps jars needed.
>
> Any ideas?
>
> Thanks.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Akshay Bhardwaj
Additionally there is "uuid" function available as well if that helps your
use case.


Akshay Bhardwaj
+91-97111-33849


On Thu, Jun 6, 2019 at 3:18 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
>> Hi all,
>>
>> I am new to spark and I am trying to write an application using
>> dataframes that normalize data.
>>
>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>> CITY, CITY_NICKNAME
>>
>> Here is what I want to do:
>>
>>
>>1. Map by country, then for each country generate a new ID and write
>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>country ID would be generated, probably using 
>> `monotonically_increasing_id`.
>>2. For each country, write several lines on a new dataframe `cities`,
>>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>>the same generated on country table and ID would be another ID I generate.
>>
>> What's the best way to do this, hopefully using only dataframes (no low
>> level RDDs) unless it's not possible?
>>
>> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
>> row in countries table with COUNTRY_ID and for every value I write a row in
>> cities table. But how to implement this in an easy and efficient way?
>>
>> I thought about using a `GroupBy Country` and then using `collect` to
>> collect all values for that country, but then I don't know how to generate
>> the country id and I am not sure about memory efficiency of `collect` for a
>> country with too many cities (bare in mind country/city is just an example,
>> my real entities are different).
>>
>> Could anyone point me to the direction of a good solution?
>>
>> Thanks,
>> Marcelo.
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Akshay Bhardwaj
Hi Marcelo,

If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
inbuilt function "monotonically_increasing_id" in Spark.
A little tweaking using Spark sql inbuilt functions can enable you to
achieve this without having to write code or define RDDs with map/reduce
functions.

Akshay Bhardwaj
+91-97111-33849


On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
wrote:

> Hi all,
>
> I am new to spark and I am trying to write an application using dataframes
> that normalize data.
>
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
> CITY, CITY_NICKNAME
>
> Here is what I want to do:
>
>
>1. Map by country, then for each country generate a new ID and write
>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>country ID would be generated, probably using 
> `monotonically_increasing_id`.
>2. For each country, write several lines on a new dataframe `cities`,
>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>the same generated on country table and ID would be another ID I generate.
>
> What's the best way to do this, hopefully using only dataframes (no low
> level RDDs) unless it's not possible?
>
> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
> row in countries table with COUNTRY_ID and for every value I write a row in
> cities table. But how to implement this in an easy and efficient way?
>
> I thought about using a `GroupBy Country` and then using `collect` to
> collect all values for that country, but then I don't know how to generate
> the country id and I am not sure about memory efficiency of `collect` for a
> country with too many cities (bare in mind country/city is just an example,
> my real entities are different).
>
> Could anyone point me to the direction of a good solution?
>
> Thanks,
> Marcelo.
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>