Re: to_json not working with selectExpr

2017-07-16 Thread Burak Yavuz
Hi Matthew,

Which Spark version are you using? The expression `to_json` was added in
2.2 with this commit:
https://github.com/apache/spark/commit/0cdcf9114527a2c359c25e46fd6556b3855bfb28

Best,
Burak

On Sun, Jul 16, 2017 at 6:24 PM, Matthew cao  wrote:

> Hi all,
> I just read the databricks blog here: https://docs.databricks.
> com/_static/notebooks/complex-nested-structured.html
>
> When I try to follow the example about the to_json and selectExpr part, it
> gave error: “org.apache.spark.sql.AnalysisException: Undefined function:
> 'to_json'. This function is neither a registered temporary function nor a
> permanent function registered in the database 'default'.; line 1 pos 0”.
> Also this error show in the original databricks notebook. I know that
> to_json function works great with select. Do I miss something when using
> selectExpr? THX.
>
> Best,
> Matthew
>
>


to_json not working with selectExpr

2017-07-16 Thread Matthew cao
Hi all,
I just read the databricks blog here: 
https://docs.databricks.com/_static/notebooks/complex-nested-structured.html 


When I try to follow the example about the to_json and selectExpr part, it gave 
error: “org.apache.spark.sql.AnalysisException: Undefined function: 'to_json'. 
This function is neither a registered temporary function nor a permanent 
function registered in the database 'default'.; line 1 pos 0”.
Also this error show in the original databricks notebook. I know that to_json 
function works great with select. Do I miss something when using selectExpr? 
THX.

Best,
Matthew



Re: splitting columns into new columns

2017-07-16 Thread ayan guha
You are looking for explode function.

On Mon, 17 Jul 2017 at 4:25 am, nayan sharma 
wrote:

> I’ve a Dataframe where in some columns there are multiple values, always
> separated by ^
>
> phone|contact|
> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>
> phone1|phone2|contact1|contact2|
> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>
> How can this be achieved using loop as the separator between column values
> are not constant.
>
> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>  I though of doing this way but the problem is  column are having 100+
> separator between the column values
>
>
>
> Thank you,
> Nayan
>
-- 
Best Regards,
Ayan Guha


splitting columns into new columns

2017-07-16 Thread nayan sharma
I’ve a Dataframe where in some columns there are multiple values, always 
separated by ^

phone|contact|
ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|

phone1|phone2|contact1|contact2| 
ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
How can this be achieved using loop as the separator between column values
are not constant.

data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
 I though of doing this way but the problem is  column are having 100+ 
separator between the column values



Thank you,
Nayan

Re: Querying on Deeply Nested JSON Structures

2017-07-16 Thread Burak Yavuz
Have you checked out this blog post?
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Shows tools and tips on how to work with nested data. You can access data
through `field1.field2.field3` and such with JSON.

Best,
Burak

On Sat, Jul 15, 2017 at 10:45 AM, Matt Deaver  wrote:

> I would love to be told otherwise, but I believe your options are to
> either 1) use the explode function or 2) pre-process the data so you don't
> have to explode it.
>
> On Jul 15, 2017 11:41 AM, "Patrick"  wrote:
>
>> Hi,
>>
>> We need to query deeply nested Json structure. However query is on a
>> single field at a nested level such as mean, median, mode.
>>
>> I am aware of the sql explode function.
>>
>> df = df_nested.withColumn('exploded', explode(top))
>>
>> But this is too slow.
>>
>> Is there any other strategy that could give us the best performance in 
>> querying nested json in Spark Dataset.
>>
>>
>> Thanks
>>
>>
>>


Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-16 Thread Yuval.Itzchakov
Yes, you do.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-to-be-specified-in-Spark-Streaming-tp28858p28862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: underlying checkpoint

2017-07-16 Thread Mendelson, Assaf
Actually, show is an action.
The issue is that unless you have some aggregations, show will only go over 
some of the dataframe, not all of it and therefore the caching won’t occur 
(similar to what happens with cache).
You need an action which requires to go over the entire dataframe (which count 
does).

Thanks,
  Assaf.

From: Bernard Jesop [mailto:bernard.je...@gmail.com]
Sent: Thursday, July 13, 2017 6:58 PM
To: Vadim Semenov
Cc: user
Subject: Re: underlying checkpoint

Thank you, one of my mistakes was to think that show() was an action.

2017-07-13 17:52 GMT+02:00 Vadim Semenov 
>:
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), 
("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
> wrote:
Hi everyone, I just tried this simple program :

 import org.apache.spark.sql.SparkSession

 object CheckpointTest extends App {

   val spark = SparkSession
 .builder()
 .appName("Toto")
 .getOrCreate()

   spark.sparkContext.setCheckpointDir(".")

   val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 
15), ("Java", 20)))

   df.show()
   df.rdd.checkpoint()
   println(if (df.rdd.isCheckpointed) "checkpointed" else "not checkpointed")
 }

But the result is still "not checkpointed".
Do you have any idea why? (knowing that the checkpoint file is created)
Best regards,
Bernard JESOP