how to change data type for columns of dataframe
Hi I got a dataframe object from other application, it means this obj is not generated by me. How can I change the data types for some columns in this dataframe? For example, change the column type from Int to Float. Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
data type missing
Hello After I converted the dataframe to RDD I found the data type was missing. scala> df.show ++---+ |name|age| ++---+ |jone| 12| |rosa| 21| ++---+ scala> df.printSchema root |-- name: string (nullable = true) |-- age: integer (nullable = false) scala> df.rdd.map{ row => (row(0),row(1)) }.collect res9: Array[(Any, Any)] = Array((jone,12), (rosa,21)) As you see above, the element's data type becomes (Any,Any). Can you help with this issue? Thanks - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark as data warehouse?
In the past time we have been using hive for building the data warehouse. Do you think if spark can used for this purpose? it's even more realtime than hive. Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark jobs don't require the master/worker to startup?
What I tried to say is, I didn't start spark master/worker at all, for a standalone deployment. But I still can login into pyspark to run the job. I don't know why. $ ps -efw|grep spark $ netstat -ntlp both the output above have no spark related info. And this machine is managed by myself, I know how to start spark correctly. But I didn't start them yet, and I still can login to pyspark to run the jobs. for exmaple: df = sc.parallelize([("t1",1),("t2",2)]).toDF(["name","number"]) df.show() ++--+ |name|number| ++--+ | t1| 1| | t2| 2| ++--+ do you know why? Thank you. frakass. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark jobs don't require the master/worker to startup?
Hello I have spark 3.2.0 deployed in localhost as the standalone mode. I even didn't run the start master and worker command: start-master.sh start-worker.sh spark://127.0.0.1:7077 And the ports (such as 7077) were not opened there. But I still can login into pyspark to run the jobs. Why this happens? Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
can dataframe API deal with subquery
such as this table definition: desc people; +---+---+--+ | col_name | data_type | comment | +---+---+--+ | name | string| | | born | date | | | sex | struct | | | contact | map| | | jobs | array | | +---+---+--+ And this sql statement: with t1 as ( select name, case when sex.id=0 then "female" else "male" end as sex, jobs[1] as lastjob from people) select * from t1 limit 10; how does dataframe run with this kind of subquery? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
question on the different way of RDD to dataframe
Hello I am converting some py code to scala. This works in python: rdd = sc.parallelize([('apple',1),('orange',2)]) rdd.toDF(['fruit','num']).show() +--+---+ | fruit|num| +--+---+ | apple| 1| |orange| 2| +--+---+ And in scala: scala> rdd.toDF("fruit","num").show() +--+---+ | fruit|num| +--+---+ | apple| 1| |orange| 2| +--+---+ But I saw many code that use a case class for translation. scala> case class Fruit(fruit:String,num:Int) defined class Fruit scala> rdd.map{case (x,y) => Fruit(x,y) }.toDF().show() +--+---+ | fruit|num| +--+---+ | apple| 1| |orange| 2| +--+---+ Do you know why to use a "case class" here? thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: add an auto_increment column
I have got the answer from Mich's answer. Thank you both. frakass On 08/02/2022 16:36, Gourav Sengupta wrote: Hi, so do you want to rank apple and tomato both as 2? Not quite clear on the use case here though. Regards, Gourav Sengupta On Tue, Feb 8, 2022 at 7:10 AM wrote: Hello Gourav As you see here orderBy has already give the solution for "equal amount": df = sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount']) df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |cherry| 5| | apple| 3| |tomato| 3| |orange| 2| +--+--+ I want to add a column at the right whose name is "top" and the value auto_increment from 1 to N. Thank you. On 08/02/2022 13:52, Gourav Sengupta wrote: Hi, sorry once again, will try to understand the problem first :) As we can clearly see that the initial responses were incorrectly guessing the solution to be monotonically_increasing function What if there are two fruits with equal amount? For any real life application, can we understand what are trying to achieve by the rankings? Regards, Gourav Sengupta On Tue, Feb 8, 2022 at 4:22 AM ayan guha wrote: For this req you can rank or dense rank. On Tue, 8 Feb 2022 at 1:12 pm, wrote: Hello, For this query: df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |tomato| 9| | apple| 6| |cherry| 5| |orange| 3| +--+--+ I want to add a column "top", in which the value is: 1,2,3... meaning top1, top2, top3... How can I do it? Thanks. On 07/02/2022 21:18, Gourav Sengupta wrote: Hi, can we understand the requirement first? What is that you are trying to achieve by auto increment id? Do you just want different ID's for rows, or you may want to keep track of the record count of a table as well, or do you want to do use them for surrogate keys? If you are going to insert records multiple times in a table, and still have different values? I think without knowing the requirements all the above responses, like everything else where solutions are reached before understanding the problem, has high chances of being wrong. Regards, Gourav Sengupta On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj wrote: Monotonically_increasing_id() will give the same functionality On Mon, 7 Feb, 2022, 6:57 am , wrote: For a dataframe object, how to add a column who is auto_increment like mysql's behavior? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Best Regards, Ayan Guha - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: add an auto_increment column
Hello Gourav As you see here orderBy has already give the solution for "equal amount": df = sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount']) df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |cherry| 5| | apple| 3| |tomato| 3| |orange| 2| +--+--+ I want to add a column at the right whose name is "top" and the value auto_increment from 1 to N. Thank you. On 08/02/2022 13:52, Gourav Sengupta wrote: Hi, sorry once again, will try to understand the problem first :) As we can clearly see that the initial responses were incorrectly guessing the solution to be monotonically_increasing function What if there are two fruits with equal amount? For any real life application, can we understand what are trying to achieve by the rankings? Regards, Gourav Sengupta On Tue, Feb 8, 2022 at 4:22 AM ayan guha wrote: For this req you can rank or dense rank. On Tue, 8 Feb 2022 at 1:12 pm, wrote: Hello, For this query: df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |tomato| 9| | apple| 6| |cherry| 5| |orange| 3| +--+--+ I want to add a column "top", in which the value is: 1,2,3... meaning top1, top2, top3... How can I do it? Thanks. On 07/02/2022 21:18, Gourav Sengupta wrote: Hi, can we understand the requirement first? What is that you are trying to achieve by auto increment id? Do you just want different ID's for rows, or you may want to keep track of the record count of a table as well, or do you want to do use them for surrogate keys? If you are going to insert records multiple times in a table, and still have different values? I think without knowing the requirements all the above responses, like everything else where solutions are reached before understanding the problem, has high chances of being wrong. Regards, Gourav Sengupta On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj wrote: Monotonically_increasing_id() will give the same functionality On Mon, 7 Feb, 2022, 6:57 am , wrote: For a dataframe object, how to add a column who is auto_increment like mysql's behavior? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Best Regards, Ayan Guha - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: add an auto_increment column
Hello, For this query: df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |tomato| 9| | apple| 6| |cherry| 5| |orange| 3| +--+--+ I want to add a column "top", in which the value is: 1,2,3... meaning top1, top2, top3... How can I do it? Thanks. On 07/02/2022 21:18, Gourav Sengupta wrote: Hi, can we understand the requirement first? What is that you are trying to achieve by auto increment id? Do you just want different ID's for rows, or you may want to keep track of the record count of a table as well, or do you want to do use them for surrogate keys? If you are going to insert records multiple times in a table, and still have different values? I think without knowing the requirements all the above responses, like everything else where solutions are reached before understanding the problem, has high chances of being wrong. Regards, Gourav Sengupta On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj wrote: Monotonically_increasing_id() will give the same functionality On Mon, 7 Feb, 2022, 6:57 am , wrote: For a dataframe object, how to add a column who is auto_increment like mysql's behavior? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: TypeError: Can not infer schema for type:
Thanks for the reply. It looks strange that in scala shell I can implement this translation: scala> sc.parallelize(List(3,2,1,4)).toDF.show +-+ |value| +-+ |3| |2| |1| |4| +-+ But in pyspark i have to write as: sc.parallelize([3,2,1,4]).map(lambda x: (x,1)).toDF(['id','count']).show() +---+-+ | id|count| +---+-+ | 3|1| | 2|1| | 1|1| | 4|1| +---+-+ So there are differences on the implementation of pyspark and scala. Thanks - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
TypeError: Can not infer schema for type:
rdd = sc.parallelize([3,2,1,4]) rdd.toDF().show() Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/sql/session.py", line 66, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/opt/spark/python/pyspark/sql/session.py", line 675, in createDataFrame return self._create_dataframe(data, schema, samplingRatio, verifySchema) File "/opt/spark/python/pyspark/sql/session.py", line 698, in _create_dataframe rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/opt/spark/python/pyspark/sql/session.py", line 486, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/opt/spark/python/pyspark/sql/session.py", line 466, in _inferSchema schema = _infer_schema(first, names=names) File "/opt/spark/python/pyspark/sql/types.py", line 1067, in _infer_schema raise TypeError("Can not infer schema for type: %s" % type(row)) TypeError: Can not infer schema for type: In my pyspark why this fails? I didnt get the way. Thanks for helps. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: dataframe doesn't support higher order func, right?
Indeed. in spark-shell I ignore the parentheses always, scala> sc.parallelize(List(3,2,1,4)).toDF.show +-+ |value| +-+ |3| |2| |1| |4| +-+ So I think it would be ok in pyspark. But this still doesn't work. why? sc.parallelize([3,2,1,4]).toDF().show() Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/sql/session.py", line 66, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/opt/spark/python/pyspark/sql/session.py", line 675, in createDataFrame return self._create_dataframe(data, schema, samplingRatio, verifySchema) File "/opt/spark/python/pyspark/sql/session.py", line 698, in _create_dataframe rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/opt/spark/python/pyspark/sql/session.py", line 486, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/opt/spark/python/pyspark/sql/session.py", line 466, in _inferSchema schema = _infer_schema(first, names=names) File "/opt/spark/python/pyspark/sql/types.py", line 1067, in _infer_schema raise TypeError("Can not infer schema for type: %s" % type(row)) TypeError: Can not infer schema for type: spark 3.2.0 On 07/02/2022 11:44, Sean Owen wrote: This is just basic Python - you're missing parentheses on toDF, so you are not calling a function nor getting its result. On Sun, Feb 6, 2022 at 9:39 PM wrote: I am a bit confused why in pyspark this doesn't work? x = sc.parallelize([3,2,1,4]) x.toDF.show() Traceback (most recent call last): File "", line 1, in AttributeError: 'function' object has no attribute 'show' Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: dataframe doesn't support higher order func, right?
I am a bit confused why in pyspark this doesn't work? x = sc.parallelize([3,2,1,4]) x.toDF.show() Traceback (most recent call last): File "", line 1, in AttributeError: 'function' object has no attribute 'show' Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
add an auto_increment column
For a dataframe object, how to add a column who is auto_increment like mysql's behavior? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
dataframe doesn't support higher order func, right?
for example, this work for RDD object: scala> val li = List(3,2,1,4,0) li: List[Int] = List(3, 2, 1, 4, 0) scala> val rdd = sc.parallelize(li) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 scala> rdd.filter(_ > 2).collect() res0: Array[Int] = Array(3, 4) After I convert RDD to the dataframe, the filter won't work: scala> val df = rdd.toDF df: org.apache.spark.sql.DataFrame = [value: int] scala> df.filter(_ > 2).show() :24: error: value > is not a member of org.apache.spark.sql.Row df.filter(_ > 2).show() But this can work: scala> df.filter($"value" > 2).show() +-+ |value| +-+ |3| |4| +-+ Where to check all the methods supported by dataframe? Thank you. Frakass - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: help check my simple job
That did resolve my issue. Thanks a lot. frakass n 06/02/2022 17:25, Hannes Bibel wrote: Hi, looks like you're packaging your application for Scala 2.13 (should be specified in your build.sbt) while your Spark installation is built for Scala 2.12. Go to https://spark.apache.org/downloads.html, select under "Choose a package type" the package type that says "Scala 2.13". With that release you should be able to run your application. In general, minor versions of Scala (e.g. 2.12 and 2.13) are incompatible. Best Hannes On Sun, Feb 6, 2022 at 10:01 AM wrote: Hello I wrote this simple job in scala: $ cat Myjob.scala import org.apache.spark.sql.SparkSession object Myjob { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder.appName("Simple Application").getOrCreate() val sparkContext = sparkSession.sparkContext val arrayRDD = sparkContext.parallelize(List(1,2,3,4,5,6,7,8)) println(arrayRDD.getClass, arrayRDD.count()) } } After package it then I submit it to spark, it gets the error: $ /opt/spark/bin/spark-submit --class "Myjob" --master local[4] target/scala-2.13/my-job_2.13-1.0.jar Exception in thread "main" java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapIntArray(int[])' at Myjob$.main(Myjob.scala:8) at Myjob.main(Myjob.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org [1]$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) What's the issue? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org Links: -- [1] http://org.apache.spark.deploy.SparkSubmit.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
help check my simple job
Hello I wrote this simple job in scala: $ cat Myjob.scala import org.apache.spark.sql.SparkSession object Myjob { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder.appName("Simple Application").getOrCreate() val sparkContext = sparkSession.sparkContext val arrayRDD = sparkContext.parallelize(List(1,2,3,4,5,6,7,8)) println(arrayRDD.getClass, arrayRDD.count()) } } After package it then I submit it to spark, it gets the error: $ /opt/spark/bin/spark-submit --class "Myjob" --master local[4] target/scala-2.13/my-job_2.13-1.0.jar Exception in thread "main" java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapIntArray(int[])' at Myjob$.main(Myjob.scala:8) at Myjob.main(Myjob.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) What's the issue? Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
how can I remove the warning message
When I submitted the job from scala client, I got the warning messages: WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release How can I just remove those messages? spark: 3.2.0 scala: 2.13.7 Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
question for definition of column types
when creating dataframe from a list, how can I specify the col type? such as: df = spark.createDataFrame(list,["name","title","salary","rate","insurance"]) df.show() +---+-+--++-+ | name|title|salary|rate|insurance| +---+-+--++-+ |buck trends| ceo|20|0.25| 100| |cindy banks| cfo|17|0.22| 120| | joe coder|developer|13| 0.2| 120| +---+-+--++-+ df.describe() DataFrame[summary: string, name: string, title: string, salary: string, rate: string, insurance: string] I want the salary, rate, insurance to be Double type, not a String type. Thank you. Frakass - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: unsubscribe
On 22/01/2022 11:07, Renan F. Souza wrote: unsubscribe You could be able to unsubscribe yourself from the list by sending an email to: user-unsubscr...@spark.apache.org thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
newbie question for reduce
Hello Please help take a look why my this simple reduce doesn't work? rdd = sc.parallelize([("a",1),("b",2),("c",3)]) rdd.reduce(lambda x,y: x[1]+y[1]) Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/rdd.py", line 1001, in reduce return reduce(f, vals) File "/opt/spark/python/pyspark/util.py", line 74, in wrapper return f(*args, **kwargs) File "", line 1, in TypeError: 'int' object is not subscriptable spark 3.2.0 Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
question of shorten syntax for rdd
Hello May I know from what version of spark, the RDD syntax can be shorten as this? rdd.groupByKey().mapValues(lambda x:len(x)).collect() [('b', 2), ('d', 1), ('a', 2)] rdd.groupByKey().mapValues(len).collect() [('b', 2), ('d', 1), ('a', 2)] I know in scala the syntax: xxx(x => x.len) can be written as: xxx(_.len). But I never know in pyspark the "_" placeholder can even be ignored. Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org