Re: adding a column to a groupBy (dataframe)

2019-06-07 Thread Marcelo Valle
Hi Bruno, that's really interesting...

So, to use explode, I would have to do a group by on countries and a
collect_all on cities, then explode the cities, right? Am I understanding
the idea right?

I think this could produce the results I want. But what would be the
behaviour under the hood? Does collect_all return an iterator or does it
return a list? If I have a country with too many cities, would my server
have to store all cities of a country in memory?





On Thu, 6 Jun 2019 at 20:57, Bruno Nassivet 
wrote:

> Hi Marcelo,
>
> Maybe the spark.sql.functions.explode give what you need?
>
> // Bruno
>
>
> Le 6 juin 2019 à 16:02, Marcelo Valle  a écrit :
>
> Generating the city id (child) is easy, monotonically increasing id worked
> for me.
>
> The problem is the country (parent) which has to be in both countries and
> cities data frames.
>
>
>
> On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:
>
>> Well, you could do a repartition on cityname/nrOfCities and use the
>> spark_partition_id function or the mappartitionswithindex dataframe method
>> to add a city Id column. Then just split the dataframe into two subsets. Be
>> careful of hashcollisions on the reparition Key though, or more than one
>> city might end up in the same partition (you can use a custom partitioner).
>>
>> It all depends on what kind of Id you want/need for the city value. I.e.
>> will you later need to append new city Id:s or not. Do you always handle
>> the entire dataset when you make this change or not.
>>
>> On the other hand, getting a distinct list of citynames is a non
>> shuffling fast operation, add a row_number column and do a broadcast join
>> with the original dataset and then split into two subsets. Probably a bit
>> faster than reshuffling the entire dataframe. As always the proof is in the
>> pudding.
>>
>> //Magnus
>>
>> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
>> wrote:
>>
>>> Akshay,
>>>
>>> First of all, thanks for the answer. I *am* using monotonically
>>> increasing id, but that's not my problem.
>>> My problem is I want to output 2 tables from 1 data frame, 1 parent
>>> table with ID for the group by and 1 child table with the parent id without
>>> the group by.
>>>
>>> I was able to solve this problem by grouping by, generating a parent
>>> data frame with an id, then joining the parent dataframe with the original
>>> one to get a child dataframe with a parent id.
>>>
>>> I would like to find a solution without this second join, though.
>>>
>>> Thanks,
>>> Marcelo.
>>>
>>>
>>> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
>>> akshay.bhardwaj1...@gmail.com> wrote:
>>>
 Hi Marcelo,

 If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
 inbuilt function "monotonically_increasing_id" in Spark.
 A little tweaking using Spark sql inbuilt functions can enable you to
 achieve this without having to write code or define RDDs with map/reduce
 functions.

 Akshay Bhardwaj
 +91-97111-33849


 On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
 wrote:

> Hi all,
>
> I am new to spark and I am trying to write an application using
> dataframes that normalize data.
>
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
> CITY, CITY_NICKNAME
>
> Here is what I want to do:
>
>
>1. Map by country, then for each country generate a new ID and
>write to a new dataframe `countries`, which would have COUNTRY_ID, 
> COUNTRY
>- country ID would be generated, probably using
>`monotonically_increasing_id`.
>2. For each country, write several lines on a new dataframe
>`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
> COUNTRY_ID
>would be the same generated on country table and ID would be another 
> ID I
>generate.
>
> What's the best way to do this, hopefully using only dataframes (no
> low level RDDs) unless it's not possible?
>
> I clearly see a MAP/Reduce process where for each KEY mapped I
> generate a row in countries table with COUNTRY_ID and for every value I
> write a row in cities table. But how to implement this in an easy and
> efficient way?
>
> I thought about using a `GroupBy Country` and then using `collect` to
> collect all values for that country, but then I don't know how to generate
> the country id and I am not sure about memory efficiency of `collect` for 
> a
> country with too many cities (bare in mind country/city is just an 
> example,
> my real entities are different).
>
> Could anyone point me to the direction of a good solution?
>
> Thanks,
> Marcelo.
>
> This email is confidential [and may be protected by legal privilege].
> If you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech 

Spark logging questions

2019-06-07 Thread test test
Hello,

How can we dump the spark driver and executor threads information in spark
application logging.?


PS: submitting spark job using spark submit

Regards
Rohit


Getting driver logs in Standalone Cluster

2019-06-07 Thread tkrol
Hey Guys, 

I am wondering what is the best way to get logs for driver in the cluster
mode on standalone cluster? Normally I used to run client mode so I could
capture logs from the console. 

Now I've started running jobs in cluster mode and obviously driver is
running on worker and can't see the logs. 

I would like to store logs (preferably in hdfs), any easy way to do that?

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[SQL] Why casting string column to timestamp gives null?

2019-06-07 Thread Jacek Laskowski
Hi,

Why is casting a string column to timestamp not giving the same results as
going through casting to long in-between? I'm tempted to consider it a bug.

scala> spark.version
res4: String = 2.4.3

scala> Seq("1", "2").toDF("ts").select($"ts" cast "timestamp").show
++
|  ts|
++
|null|
|null|
++

scala> Seq("1", "2").toDF("ts").select($"ts" cast "long").select($"ts" cast
"timestamp").show
+---+
| ts|
+---+
|1970-01-01 01:00:01|
|1970-01-01 01:00:02|
+---+

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski


Re: sparksql in sparkR?

2019-06-07 Thread Felix Cheung
This seem to be more a question of spark-sql shell? I may suggest you change 
the email title to get more attention.


From: ya 
Sent: Wednesday, June 5, 2019 11:48:17 PM
To: user@spark.apache.org
Subject: sparksql in sparkR?

Dear list,

I am trying to use sparksql within my R, I am having the following questions, 
could you give me some advice please? Thank you very much.

1. I connect my R and spark using the library sparkR, probably some of the 
members here also are R users? Do I understand correctly that SparkSQL can be 
connected and triggered via SparkR and used in R (not in sparkR shell of spark)?

2. I ran sparkR library in R, trying to create a new sql database and a table, 
I could not get the database and the table I want. The code looks like below:

library(SparkR)
Sys.setenv(SPARK_HOME='/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7')
sparkR.session(sparkHome=Sys.getenv('/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7'))
sql("create database learnsql; use learnsql")
sql("
create table employee_tbl
(emp_id varchar(10) not null,
emp_name char(10) not null,
emp_st_addr char(10) not null,
emp_city char(10) not null,
emp_st char(10) not null,
emp_zip integer(5) not null,
emp_phone integer(10) null,
emp_pager integer(10) null);
insert into employee_tbl values ('0001','john','yanlanjie 
1','gz','jiaoqiaojun','510006','1353');
select*from employee_tbl;
“)

I ran the following code in spark-sql shell, I get the database learnsql, 
however, I still can’t get the table.

spark-sql> create database learnsql;show databases;
19/06/06 14:42:36 INFO HiveMetaStore: 0: create_database: 
Database(name:learnsql, description:, 
locationUri:file:/Users/ya/spark-warehouse/learnsql.db, parameters:{})
19/06/06 14:42:36 INFO audit: ugi=yaip=unknown-ip-addr  
cmd=create_database: Database(name:learnsql, description:, 
locationUri:file:/Users/ya/spark-warehouse/learnsql.db, parameters:{})
Error in query: org.apache.hadoop.hive.metastore.api.AlreadyExistsException: 
Database learnsql already exists;

spark-sql> create table employee_tbl
 > (emp_id varchar(10) not null,
 > emp_name char(10) not null,
 > emp_st_addr char(10) not null,
 > emp_city char(10) not null,
 > emp_st char(10) not null,
 > emp_zip integer(5) not null,
 > emp_phone integer(10) null,
 > emp_pager integer(10) null);
Error in query:
no viable alternative at input 'create table employee_tbl\n(emp_id varchar(10) 
not'(line 2, pos 20)

== SQL ==
create table employee_tbl
(emp_id varchar(10) not null,
^^^
emp_name char(10) not null,
emp_st_addr char(10) not null,
emp_city char(10) not null,
emp_st char(10) not null,
emp_zip integer(5) not null,
emp_phone integer(10) null,
emp_pager integer(10) null)

spark-sql> insert into employee_tbl values ('0001','john','yanlanjie 
1','gz','jiaoqiaojun','510006','1353');
19/06/06 14:43:43 INFO HiveMetaStore: 0: get_table : db=default tbl=employee_tbl
19/06/06 14:43:43 INFO audit: ugi=yaip=unknown-ip-addr  cmd=get_table : 
db=default tbl=employee_tbl
Error in query: Table or view not found: employee_tbl; line 1 pos 0


Does sparkSQL has different coding grammar? What did I miss?

Thank you very much.

Best regards,

YA




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured
Streaming but Getting failures. Please do help.

val spark: SparkSession =
SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", "/tmp/checkpoint")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start()

topicQuery.awaitTermination()
topicQuery.stop()


Above code is working well but when I am trying to write to Parquet at
HDFS getting exceptions.


logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()


*Exception Details:*


Exception in thread "main" java.io.IOException: mkdir of
/sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at 
org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at 
com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at 
com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks


Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Also anyone has any idea to resolve this issue -
https://stackoverflow.com/questions/56390492/spark-metadata-0-doesnt-exist-while-compacting-batch-9-structured-streaming-er

On Fri, Jun 7, 2019 at 5:59 PM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am trying to write data from Kafka Topic to Parquet HDFS with Structured
> Streaming but Getting failures. Please do help.
>
> val spark: SparkSession = 
> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
> import spark.implicits._
> val dataFromTopicDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "earliest")
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>
> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
> val topicQuery = dataFromTopicDF.writeStream
>   .format("console")
>   .option("truncate", false)
>   .option("checkpointLocation", "/tmp/checkpoint")
>   .trigger(Trigger.ProcessingTime(10.seconds))
>   .start()
>
> topicQuery.awaitTermination()
> topicQuery.stop()
>
>
> Above code is working well but when I am trying to write to Parquet at HDFS 
> getting exceptions.
>
>
> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>
> val parquetQuery = dataFromTopicDF.writeStream
> .format("parquet")
> .option("startingOffsets", "earliest")
> .option("checkpointLocation", "/tmp/checkpoint")
> .option("path", "/sample-topic")
> .start()
>
> parquetQuery.awaitTermination()
> parquetQuery.stop()
>
>
> *Exception Details:*
>
>
> Exception in thread "main" java.io.IOException: mkdir of 
> /sample-topic/_spark_metadata failed
>   at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>   at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>   at 
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>   at 
> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>   at 
> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>   at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks
>
>


Spark SQL in R?

2019-06-07 Thread ya
Dear Felix and Richikesh and list,

Thank you very much for your previous help. So far I have tried two ways to 
trigger Spark SQL: one is to use R with sparklyr library and SparkR library; 
the other way is to use SparkR shell from Spark. I am not connecting a remote 
spark cluster, but a local one. Both failed with or without hive-site.xml. I 
suspect the content of hive-site.xml I found online was not appropriate for 
this case, as the spark session can not be initialized after adding this 
hive-site.xml. My questions are:

1. Is there any example for the content of hive-site.xml for this case?

2. I used sql() function to call the Spark SQL, is this the right way to do it?

###
##Here is the content in the hive-site.xml:##
###



javax.jdo.option.ConnectionURL
jdbc:mysql://192.168.76.100:3306/hive?createDatabaseIfNotExist=true
JDBC connect string for a JDBC metastore

 

javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
Driver class name for a JDBC metastore

 

javax.jdo.option.ConnectionUserName
root
username to use against metastore database

 

javax.jdo.option.ConnectionPassword
123
password to use against metastore database






##Here is the situation happened in R:##


> library(sparklyr) # load sparklyr package
> sc=spark_connect(master="local",spark_home="/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7")
>  # connect sparklyr with spark
> sql('create database learnsql')
Error in sql("create database learnsql") : could not find function "sql"
> library(SparkR)

Attaching package: ‘SparkR’

The following object is masked from ‘package:sparklyr’:

collect

The following objects are masked from ‘package:stats’:

cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

as.data.frame, colnames, colnames<-, drop, endsWith, intersect, rank, rbind,
sample, startsWith, subset, summary, transform, union

> sql('create database learnsql')
Error in getSparkSession() : SparkSession not initialized
> Sys.setenv(SPARK_HOME='/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7') 
> sparkR.session(sparkHome=Sys.getenv('/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7'))
Spark not found in SPARK_HOME: 
Spark package found in SPARK_HOME: 
/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7
Launching java with spark-submit command 
/Users/ya/Downloads/soft/spark-2.4.3-bin-hadoop2.7/bin/spark-submit   
sparkr-shell 
/var/folders/d8/7j6xswf92c3gmhwy_lrk63pmgn/T//Rtmpz22kK9/backend_port103d4cfcfd2c
 
19/06/08 11:14:57 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Error in handleErrors(returnStatus, conn) : 

…... hundreds of lines of information and mistakes here ……

> sql('create database learnsql')
Error in getSparkSession() : SparkSession not initialized



###
##Here is what happened in SparkR shell:##


Error in handleErrors(returnStatus, conn) : 
  java.lang.IllegalArgumentException: Error while instantiating 
'org.apache.spark.sql.hive.HiveSessionStateBuilder':
at 
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
at 
org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:145)
at 
org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:144)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
at 
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
at 
org.apache.spark.sql.api.r.SQLUtils$$anonfun$setSparkContextSessionConf$2.apply(SQLUtils.scala:80)
at 
org.apache.spark.sql.api.r.SQLUtils$$anonfun$setSparkContextSessionConf$2.apply(SQLUtils.scala:79)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.Iterator$class.foreach(Iterator.sca
> sql('create database learnsql')
Error in getSparkSession() : SparkSession not initialized



Thank you very much.

YA







> 在 2019年6月8日,上午1:44,Rishikesh Gawade  写道:
> 
> Hi.
> 1. Yes you can connect to spark via R. If you are connecting to a remote 
> spark cluster then you'll need EITHER a spark binary along with hive-site.xml 
> in its config direcctory on the machine running R OR livy server installed on 
> the cluster. You can then go on to use SparklyR, which, although has almost 
> the same functions as of SparkR, is recommended over the latter.

Spark 2.2 With Column usage

2019-06-07 Thread anbutech
Hi Sir,

Could you please advise to fix the below issue in the withColumn in the
spark 2.2 scala 2.11 joins

def processing(spark:SparkSession,

dataset1:Dataset[Reference],

dataset2:Dataset[DataCore],

dataset3:Dataset[ThirdPartyData] ,

dataset4:Dataset[OtherData]

date:String):Dataset[DataMerge] {

val referenceFiltered = dataset2.filter(.dataDate ==
date).filter.someColumn).select("id").toString

dataset1.as("t1)

join(dataset3.as("t2"),

col(t1.col1) === col(t2.col1), JOINTYPE.Inner )

.join(dataset4.as("t3"), col(t3.col1) === col(t1.col1),

JOINTYPE.Inner)

.withColumn("new_column",lit(referenceFiltered))

.selectexpr(

"id", ---> want to get this value

"column1,

"column2,

"column3",

"column4" )

}

how do i get the String value ,let say the value"124567"
("referenceFiltered") inside the withColumn?

im getting the withColumn output as "id:BigInt" . I want to get the same
value for all the records.

Note:

I have asked not use cross join in the code. Any other way to fix this
issue.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org