Re: Parallel write to different partitions
Found this issue reported earlier but was bulk closed: https://issues.apache.org/jira/browse/SPARK-27030 Regards, Shrikant On Fri, 22 Sep 2023 at 12:03 AM, Shrikant Prasad wrote: > Hi all, > > We have multiple spark jobs running in parallel trying to write into same > hive table but each job writing into different partition. This was working > fine with Spark 2.3 and Hadoop 2.7. > > But after upgrading to Spark 3.2 and Hadoop 3.2.2, these parallel jobs are > failing with FileNotFound exceptions for files under > /warehouse/db/table/temporary/0/ directory. > > It seems earlier the temporary dir was created under the partition being > written but now its created directly under the table directory which is > causing concurrency issues with multiple jobs trying to cleanup the same > temporary directory. > > Is there a way now to achieve parallel writes to different partitions of > same table? Also any insight into what caused the change in behavior of > temporary dir creation will be helpful. > > Thanks and regards, > Shrikant >
Parallel write to different partitions
Hi all, We have multiple spark jobs running in parallel trying to write into same hive table but each job writing into different partition. This was working fine with Spark 2.3 and Hadoop 2.7. But after upgrading to Spark 3.2 and Hadoop 3.2.2, these parallel jobs are failing with FileNotFound exceptions for files under /warehouse/db/table/temporary/0/ directory. It seems earlier the temporary dir was created under the partition being written but now its created directly under the table directory which is causing concurrency issues with multiple jobs trying to cleanup the same temporary directory. Is there a way now to achieve parallel writes to different partitions of same table? Also any insight into what caused the change in behavior of temporary dir creation will be helpful. Thanks and regards, Shrikant
Re: Spark migration from 2.3 to 3.0.1
I agree with you that it's not the recommended approach. But I just want to understand which change caused this change in behavior. If you can point me to some Jira in which this change was made, that would be greatly appreciated. Regards, Shrikant On Mon, 2 Jan 2023 at 9:46 PM, Sean Owen wrote: > Not true, you've never been able to use the SparkSession inside a Spark > task. You aren't actually using it, if the application worked in Spark 2.x. > Now, you need to avoid accidentally serializing it, which was the right > thing to do even in Spark 2.x. Just move the sesion inside main(), not a > member. > Or what other explanation do you have? I don't understand. > > On Mon, Jan 2, 2023 at 10:10 AM Shrikant Prasad > wrote: > >> If that was the case and deserialized session would not work, the >> application would not have worked. >> >> As per the logs and debug prints, in spark 2.3 the main object is not >> getting deserialized in executor, otherise it would have failed then also. >> >> On Mon, 2 Jan 2023 at 9:15 PM, Sean Owen wrote: >> >>> It silently allowed the object to serialize, though the >>> serialized/deserialized session would not work. Now it explicitly fails. >>> >>> On Mon, Jan 2, 2023 at 9:43 AM Shrikant Prasad >>> wrote: >>> >>>> Thats right. But the serialization would be happening in Spark 2.3 >>>> also, why we dont see this error there? >>>> >>>> On Mon, 2 Jan 2023 at 9:09 PM, Sean Owen wrote: >>>> >>>>> Oh, it's because you are defining "spark" within your driver object, >>>>> and then it's getting serialized because you are trying to use TestMain >>>>> methods in your program. >>>>> This was never correct, but now it's an explicit error in Spark 3. The >>>>> session should not be a member variable. >>>>> >>>>> On Mon, Jan 2, 2023 at 9:24 AM Shrikant Prasad >>>>> wrote: >>>>> >>>>>> Please see these logs. The error is thrown in executor: >>>>>> >>>>>> 23/01/02 15:14:44 ERROR Executor: Exception in task 0.0 in stage 0.0 >>>>>> (TID 0) >>>>>> >>>>>> java.lang.ExceptionInInitializerError >>>>>> >>>>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> >>>>>>at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> >>>>>>at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> >>>>>>at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> >>>>>>at >>>>>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) >>>>>> >>>>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> >>>>>>at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> >>>>>>at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> >>>>>>at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> >>>>>>at >>>>>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >>>>>> >>>>>>at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >>>>>> >>>>>>at >>>>>>
Re: Spark migration from 2.3 to 3.0.1
If that was the case and deserialized session would not work, the application would not have worked. As per the logs and debug prints, in spark 2.3 the main object is not getting deserialized in executor, otherise it would have failed then also. On Mon, 2 Jan 2023 at 9:15 PM, Sean Owen wrote: > It silently allowed the object to serialize, though the > serialized/deserialized session would not work. Now it explicitly fails. > > On Mon, Jan 2, 2023 at 9:43 AM Shrikant Prasad > wrote: > >> Thats right. But the serialization would be happening in Spark 2.3 also, >> why we dont see this error there? >> >> On Mon, 2 Jan 2023 at 9:09 PM, Sean Owen wrote: >> >>> Oh, it's because you are defining "spark" within your driver object, and >>> then it's getting serialized because you are trying to use TestMain methods >>> in your program. >>> This was never correct, but now it's an explicit error in Spark 3. The >>> session should not be a member variable. >>> >>> On Mon, Jan 2, 2023 at 9:24 AM Shrikant Prasad >>> wrote: >>> >>>> Please see these logs. The error is thrown in executor: >>>> >>>> 23/01/02 15:14:44 ERROR Executor: Exception in task 0.0 in stage 0.0 >>>> (TID 0) >>>> >>>> java.lang.ExceptionInInitializerError >>>> >>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> >>>>at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> >>>>at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> >>>>at java.lang.reflect.Method.invoke(Method.java:498) >>>> >>>>at >>>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) >>>> >>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> >>>>at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> >>>>at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> >>>>at java.lang.reflect.Method.invoke(Method.java:498) >>>> >>>>at >>>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) >>>> >>>>at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >>>> >>>>at >>>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) >>>> >>>>at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >>>> >>>>at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >>>> >>>>at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >>>> >>>>at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >>>> >>>>at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >>>> >>>>at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >>>> >>>>at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >>>> >>>>at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >>>> >>>>at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >>>> >>>>at >>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) >>>> >>>>at >>>> java.io.Obj
Re: Spark migration from 2.3 to 3.0.1
Thats right. But the serialization would be happening in Spark 2.3 also, why we dont see this error there? On Mon, 2 Jan 2023 at 9:09 PM, Sean Owen wrote: > Oh, it's because you are defining "spark" within your driver object, and > then it's getting serialized because you are trying to use TestMain methods > in your program. > This was never correct, but now it's an explicit error in Spark 3. The > session should not be a member variable. > > On Mon, Jan 2, 2023 at 9:24 AM Shrikant Prasad > wrote: > >> Please see these logs. The error is thrown in executor: >> >> 23/01/02 15:14:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID >> 0) >> >> java.lang.ExceptionInInitializerError >> >>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>at java.lang.reflect.Method.invoke(Method.java:498) >> >>at >> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) >> >>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>at java.lang.reflect.Method.invoke(Method.java:498) >> >>at >> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) >> >>at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) >> >>at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> >>at >> java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) >> >>at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) >> >>at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >> >>at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >> >>at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >> >>at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> >>at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >> >>at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >> >>at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >> >>at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> >>at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) >> >>at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) >> >>at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) >> >>at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> >>at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) >> >>at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) >> >>at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) >> >>at >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) >> >>at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) >> >>at org.apache.spark.scheduler.Task.run(Task.scala:127) >> >>at >> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) >> >>at >> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) >> >>at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) >> >>at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >>at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >>at java.lang.Thread.run(Thread.java:748) >> >> Caused by: org.apache.spark.SparkException: A master URL must be set in >> your configuration >> >>at org.apache.spark.SparkContext.(SparkContext.scala:38
Re: Spark migration from 2.3 to 3.0.1
Please see these logs. The error is thrown in executor: 23/01/02 15:14:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ExceptionInInitializerError at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.(SparkContext.scala:385) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928) at TestMain$.(TestMain.scala:12) at TestMain$.(TestMain.scala) On Mon, 2 Jan 2023 at 8:29 PM, Sean Owen wrote: > It's not running on the executor; that's not the issue. See your stack > trace, where it clearly happens in the driver. > > On Mon, Jan 2, 2023 at 8:58 AM Shrikant Prasad > wrote: > >> Even if I set the master as yarn, it will not have access to rest of the >> spark confs. It will need spark.yarn.app.id. >> >> The main issue is if its working as it is in Spark 2.3 why its not >> working in Spark 3 i.e why the session is getting created on executor. >> Another thing we tried is removing the df to rdd conversion just for >> debug and it works in Spark 3. >> >> So, it might be something to do with df to rdd conversion or >> serialization behavior change from Spark 2.3 to Spark 3.0 if there is any. >> But couldn't find the root cause. >> >> Regards, >> Shrikant >> >> On Mon, 2 Jan 2023 at 7:54 PM, Sean Owen wrote: >> >>> So call .setMaster("yarn"), per the error >>> >>> On Mon, Jan 2, 2023 at 8:20 AM Shrikant Prasad >>> wrote: >>>
Re: Spark migration from 2.3 to 3.0.1
Even if I set the master as yarn, it will not have access to rest of the spark confs. It will need spark.yarn.app.id. The main issue is if its working as it is in Spark 2.3 why its not working in Spark 3 i.e why the session is getting created on executor. Another thing we tried is removing the df to rdd conversion just for debug and it works in Spark 3. So, it might be something to do with df to rdd conversion or serialization behavior change from Spark 2.3 to Spark 3.0 if there is any. But couldn't find the root cause. Regards, Shrikant On Mon, 2 Jan 2023 at 7:54 PM, Sean Owen wrote: > So call .setMaster("yarn"), per the error > > On Mon, Jan 2, 2023 at 8:20 AM Shrikant Prasad > wrote: > >> We are running it in cluster deploy mode with yarn. >> >> Regards, >> Shrikant >> >> On Mon, 2 Jan 2023 at 6:15 PM, Stelios Philippou >> wrote: >> >>> Can we see your Spark Configuration parameters ? >>> >>> The mater URL refers to as per java >>> new SparkConf()....setMaster("local[*]") >>> according to where you want to run this >>> >>> On Mon, 2 Jan 2023 at 14:38, Shrikant Prasad >>> wrote: >>> >>>> Hi, >>>> >>>> I am trying to migrate one spark application from Spark 2.3 to 3.0.1. >>>> >>>> The issue can be reproduced using below sample code: >>>> >>>> object TestMain { >>>> >>>> val session = >>>> SparkSession.builder().appName("test").enableHiveSupport().getOrCreate() >>>> >>>> def main(args: Array[String]): Unit = { >>>> >>>> import session.implicits._ >>>> val a = *session.*sparkContext.parallelize(*Array* >>>> (("A",1),("B",2))).toDF("_c1","_c2").*rdd*.map(x=> >>>> x(0).toString).collect() >>>> *println*(a.mkString("|")) >>>> >>>> } >>>> } >>>> >>>> It runs successfully in Spark 2.3 but fails with Spark 3.0.1 with below >>>> exception: >>>> >>>> Caused by: org.apache.spark.SparkException: A master URL must be set in >>>> your configuration >>>> >>>> at >>>> org.apache.spark.SparkContext.(SparkContext.scala:394) >>>> >>>> at >>>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690) >>>> >>>> at >>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949) >>>> >>>> at scala.Option.getOrElse(Option.scala:189) >>>> >>>> at >>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943) >>>> >>>> at TestMain$.(TestMain.scala:7) >>>> >>>> at TestMain$.(TestMain.scala) >>>> >>>> >>>> From the exception it appears that it tries to create spark session on >>>> executor also in Spark 3 whereas its not created again on executor in Spark >>>> 2.3. >>>> >>>> Can anyone help in identfying why there is this change in behavior? >>>> >>>> Thanks and Regards, >>>> >>>> Shrikant >>>> >>>> -- >>>> Regards, >>>> Shrikant Prasad >>>> >>> -- >> Regards, >> Shrikant Prasad >> > -- Regards, Shrikant Prasad
Re: Spark migration from 2.3 to 3.0.1
We are running it in cluster deploy mode with yarn. Regards, Shrikant On Mon, 2 Jan 2023 at 6:15 PM, Stelios Philippou wrote: > Can we see your Spark Configuration parameters ? > > The mater URL refers to as per java > new SparkConf()setMaster("local[*]") > according to where you want to run this > > On Mon, 2 Jan 2023 at 14:38, Shrikant Prasad > wrote: > >> Hi, >> >> I am trying to migrate one spark application from Spark 2.3 to 3.0.1. >> >> The issue can be reproduced using below sample code: >> >> object TestMain { >> >> val session = >> SparkSession.builder().appName("test").enableHiveSupport().getOrCreate() >> >> def main(args: Array[String]): Unit = { >> >> import session.implicits._ >> val a = *session.*sparkContext.parallelize(*Array* >> (("A",1),("B",2))).toDF("_c1","_c2").*rdd*.map(x=> >> x(0).toString).collect() >> *println*(a.mkString("|")) >> >> } >> } >> >> It runs successfully in Spark 2.3 but fails with Spark 3.0.1 with below >> exception: >> >> Caused by: org.apache.spark.SparkException: A master URL must be set in >> your configuration >> >> at >> org.apache.spark.SparkContext.(SparkContext.scala:394) >> >> at >> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690) >> >> at >> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949) >> >> at scala.Option.getOrElse(Option.scala:189) >> >> at >> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943) >> >> at TestMain$.(TestMain.scala:7) >> >> at TestMain$.(TestMain.scala) >> >> >> From the exception it appears that it tries to create spark session on >> executor also in Spark 3 whereas its not created again on executor in Spark >> 2.3. >> >> Can anyone help in identfying why there is this change in behavior? >> >> Thanks and Regards, >> >> Shrikant >> >> -- >> Regards, >> Shrikant Prasad >> > -- Regards, Shrikant Prasad
Spark migration from 2.3 to 3.0.1
Hi, I am trying to migrate one spark application from Spark 2.3 to 3.0.1. The issue can be reproduced using below sample code: object TestMain { val session = SparkSession.builder().appName("test").enableHiveSupport().getOrCreate() def main(args: Array[String]): Unit = { import session.implicits._ val a = *session.*sparkContext.parallelize(*Array*(("A",1),("B",2))).toDF("_ c1","_c2").*rdd*.map(x=> x(0).toString).collect() *println*(a.mkString("|")) } } It runs successfully in Spark 2.3 but fails with Spark 3.0.1 with below exception: Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.(SparkContext.scala:394) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943) at TestMain$.(TestMain.scala:7) at TestMain$.(TestMain.scala) >From the exception it appears that it tries to create spark session on executor also in Spark 3 whereas its not created again on executor in Spark 2.3. Can anyone help in identfying why there is this change in behavior? Thanks and Regards, Shrikant -- Regards, Shrikant Prasad
Re: sequence file write
I have tried with that also. It gives same exception: ClassNotFoundException: sequencefile.DefaultSource Regards, Shrikant On Mon, 14 Nov 2022 at 6:35 PM, Jie Han wrote: > It seems that the name is “sequencefile”. > > > 2022年11月14日 20:59,Shrikant Prasad 写道: > > > > Hi, > > > > I have an application which writes a dataframe into sequence file using > df.write.format("sequence").insertInto("hivetable1") > > > > This was working fine with Spark 2.7. > > Now I am trying to migrate to Spark 3.2. Getting ClassNotFoundException: > sequence.DefaultSource error with Spark 3.2. > > > > Is there any change in sequence file support in 3.2 or any code change > is required to make it work? > > > > Thanks and regards, > > Shrikant > > > > > > -- > > Regards, > > Shrikant Prasad > > -- Regards, Shrikant Prasad
sequence file write
Hi, I have an application which writes a dataframe into sequence file using df.write.format("sequence").insertInto("hivetable1") This was working fine with Spark 2.7. Now I am trying to migrate to Spark 3.2. Getting ClassNotFoundException: sequence.DefaultSource error with Spark 3.2. Is there any change in sequence file support in 3.2 or any code change is required to make it work? Thanks and regards, Shrikant -- Regards, Shrikant Prasad
Re: Dynamic allocation on K8
Hi Nikhil, Spark on Kubernetes supports dynamic allocation using shuffle tracking feature instead of the external shuffle service. In order to enable dynamic allocation, you should set these two configs as true: spark.dynamicAllocation.enabled and spark.dynamicAllocation.shuffleTracking.enabled Regards, Shrikant On Tue, 25 Oct 2022 at 10:44 PM, Nikhil Goyal wrote: > Hi folks, > When running spark on Kubernetes is it possible to use dynamic allocation? > Some blog posts > <https://spot.io/blog/setting-up-managing-monitoring-spark-on-kubernetes/> > mentioned that dynamic allocation is available, however I am not sure how > it works. Spark official docs > <https://spark.apache.org/docs/latest/running-on-kubernetes.html#future-work> > say that shuffle service is not yet available. > > Thanks > > Nikhil > -- Regards, Shrikant Prasad
Spark on k8s issues with s3a committer dependencies or config?
Hi all, I am trying out Spark 3.2.1 on k8s using Hadoop 3.3.1 Running into issues with writing to s3 bucket using TemporaryAWSCredentialsProvider https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Using_Session_Credentials_with_TemporaryAWSCredentialsProvider While reading from s3 works, I am getting error 403 access denied while writing to the KMS enabled bucket. I am wondering if I am missing some dependency jars or client configuration properties. I would Appreciate your help if someone can give me a few pointers on this. Regards, Prasad Paravatha
CPU usage from Event log
Hi, I am trying to calculate CPU utilization of an Executor(JVM level CPU usage) using Event log. Can someone please help me with this? 1) Which column/properties to select 2) the correct formula to derive cpu usage Has anyone done anything similar to this? We have many pipelines and those are using very huge EMR clusters. I am trying to find out the cpu utilization and memory utilization of the nodes. This will help me find out if the clusters are under utilized and reduce the nodes, Is there a better way to get these stats without changing the code? Thanks, Prasad
Re: One click to run Spark on Kubernetes
Hi Bo Yang, Would it be something along the lines of Apache livy? Thanks, Prasad On Tue, Feb 22, 2022 at 10:22 PM bo yang wrote: > It is not a standalone spark cluster. In some details, it deploys a Spark > Operator (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) > and an extra REST Service. When people submit Spark application to that > REST Service, the REST Service will create a CRD inside the > Kubernetes cluster. Then Spark Operator will pick up the CRD and launch the > Spark application. The one click tool intends to hide these details, so > people could just submit Spark and do not need to deal with too many > deployment details. > > On Tue, Feb 22, 2022 at 8:09 PM Bitfox wrote: > >> Can it be a cluster installation of spark? or just the standalone node? >> >> Thanks >> >> On Wed, Feb 23, 2022 at 12:06 PM bo yang wrote: >> >>> Hi Spark Community, >>> >>> We built an open source tool to deploy and run Spark on Kubernetes with >>> a one click command. For example, on AWS, it could automatically create an >>> EKS cluster, node group, NGINX ingress, and Spark Operator. Then you will >>> be able to use curl or a CLI tool to submit Spark application. After the >>> deployment, you could also install Uber Remote Shuffle Service to enable >>> Dynamic Allocation on Kuberentes. >>> >>> Anyone interested in using or working together on such a tool? >>> >>> Thanks, >>> Bo >>> >>> -- Regards, Prasad Paravatha
Re: Profiling spark application
Hi, It will require code changes and I am looking at some third party code , I am looking for something which I can just hook to jvm and get the stats.. Thanks, Prasad On Thu, Jan 20, 2022 at 11:00 AM Sonal Goyal wrote: > Hi Prasad, > > Have you checked the SparkListener - > https://mallikarjuna_g.gitbooks.io/spark/content/spark-SparkListener.html > ? > > Cheers, > Sonal > https://github.com/zinggAI/zingg > > > > On Thu, Jan 20, 2022 at 10:49 AM Prasad Bhalerao < > prasadbhalerao1...@gmail.com> wrote: > >> Hello, >> >> Is there any way we can profile spark applications which will show no. of >> invocations of spark api and their execution time etc etc just the way >> jprofiler shows all the details? >> >> >> Thanks, >> Prasad >> >
Profiling spark application
Hello, Is there any way we can profile spark applications which will show no. of invocations of spark api and their execution time etc etc just the way jprofiler shows all the details? Thanks, Prasad
Re: [ANNOUNCE] Apache Spark 3.2.0
https://www.apache.org/dyn/closer.lua/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.3.tgz FYI, unable to download from this location. Also, I don’t see Hadoop 3.3 version in the dist > On Oct 19, 2021, at 9:39 AM, Bode, Meikel, NMA-CFD > wrote: > > > Many thanks! 😊 > > From: Gengliang Wang > Sent: Dienstag, 19. Oktober 2021 16:16 > To: dev ; user > Subject: [ANNOUNCE] Apache Spark 3.2.0 > > Hi all, > > Apache Spark 3.2.0 is the third release of the 3.x line. With tremendous > contribution from the open-source community, this release managed to resolve > in excess of 1,700 Jira tickets. > > We'd like to thank our contributors and users for their contributions and > early feedback to this release. This release would not have been possible > without you. > > To download Spark 3.2.0, head over to the download page: > https://spark.apache.org/downloads.html > > To view the release notes: > https://spark.apache.org/releases/spark-release-3-2-0.html
Re: reporting use case
Hi, I am new to spark and no SQL databases. So Please correct me if I am wrong. Since I will be accessing multiple columns (almost 20-30 columns) of a row, I will have to go with rowbased db instead column based right! May be I can use Avro in this case. Does spark go well with Avroro? I will do my research on this. But please let me know your opinion on this. Thanks, Prasad On Fri 5 Apr, 2019, 1:09 AM Teemu Heikkilä So basically you could have base dump/snapshot of the full database - or > all the required data stored into HDFS or similar system as partitioned > files (ie. orc/parquet) > > Then you use the change stream after the dump and join it on the snapshot > - similarly than what your database is doing. > After that you can build the aggregates and reports from that table. > > - T > > On 4 Apr 2019, at 22.35, Prasad Bhalerao > wrote: > > I did not understand this "update actual snapshots ie. by joining the > data". > > > There is another microservice which updates these Oracle tables. I can > have this micro service to send the update data feed on Kafka topics. > > Thanks, > Prasad > > On Fri 5 Apr, 2019, 12:57 AM Teemu Heikkilä >> Based on your answers, I would consider using the update stream to update >> actual snapshots ie. by joining the data >> >> Ofcourse now it depends on how the update stream has been implemented how >> to get the data in spark. >> >> Could you tell little bit more about that? >> - Teemu >> >> On 4 Apr 2019, at 22.23, Prasad Bhalerao >> wrote: >> >> Hi , >> >> I can create a view on these tables but the thing is I am going to need >> almost every column from these tables and I have faced issues with oracle >> views on such a large tables which involves joins. Some how oracle used to >> choose not so correct execution plan. >> >> Can you please tell me how creating a views will help in this scenario? >> >> Can you please tell if I am thinking in right direction? >> >> I have two challenges >> 1) First to load 2-4 TB of data in spark very quickly. >> 2) And then keep this data updated in spark whenever DB updates are done. >> >> Thanks, >> Prasad >> >> On Fri, Apr 5, 2019 at 12:35 AM Jason Nerothin >> wrote: >> >>> Hi Prasad, >>> >>> Could you create an Oracle-side view that captures only the relevant >>> records and the use Spark JDBC connector to load the view into Spark? >>> >>> On Thu, Apr 4, 2019 at 1:48 PM Prasad Bhalerao < >>> prasadbhalerao1...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am exploring spark for my Reporting application. >>>> My use case is as follows... >>>> I have 4-5 oracle tables which contains more than 1.5 billion rows. >>>> These tables are updated very frequently every day. I don't have choice to >>>> change database technology. So this data is going to remain in Oracle only. >>>> To generate 1 report, on an average 15 - 50 million rows has to be >>>> fetched from oracle tables. These rows contains some blob columns. Most of >>>> the time is spent in fetching these many rows from db over the network. >>>> Data processing is not that complex. Currently these report takes around >>>> 3-8 hours to complete. I trying to speed up this report generation process. >>>> >>>> Can use spark as a caching layer in this case to avoid fetching data >>>> from oracle over the network every time? I am thinking to submit a spark >>>> job for each report request and use spark SQL to fetch the data and then >>>> process it and write to a file? I trying to use kind of data locality in >>>> this case. >>>> >>>> Whenever a data is updated in oracle tables can I refresh the data in >>>> spark storage? I can get the update feed using messaging technology. >>>> >>>> Can some one from community help me with this? >>>> Suggestions are welcome. >>>> >>>> >>>> Thanks, >>>> Prasad >>>> >>>> >>>> >>>> Thanks, >>>> Prasad >>>> >>> >>> >>> -- >>> Thanks, >>> Jason >>> >> >> >
Re: reporting use case
Hi , I can create a view on these tables but the thing is I am going to need almost every column from these tables and I have faced issues with oracle views on such a large tables which involves joins. Some how oracle used to choose not so correct execution plan. Can you please tell me how creating a views will help in this scenario? Can you please tell if I am thinking in right direction? I have two challenges 1) First to load 2-4 TB of data in spark very quickly. 2) And then keep this data updated in spark whenever DB updates are done. Thanks, Prasad On Fri, Apr 5, 2019 at 12:35 AM Jason Nerothin wrote: > Hi Prasad, > > Could you create an Oracle-side view that captures only the relevant > records and the use Spark JDBC connector to load the view into Spark? > > On Thu, Apr 4, 2019 at 1:48 PM Prasad Bhalerao < > prasadbhalerao1...@gmail.com> wrote: > >> Hi, >> >> I am exploring spark for my Reporting application. >> My use case is as follows... >> I have 4-5 oracle tables which contains more than 1.5 billion rows. These >> tables are updated very frequently every day. I don't have choice to change >> database technology. So this data is going to remain in Oracle only. >> To generate 1 report, on an average 15 - 50 million rows has to be >> fetched from oracle tables. These rows contains some blob columns. Most of >> the time is spent in fetching these many rows from db over the network. >> Data processing is not that complex. Currently these report takes around >> 3-8 hours to complete. I trying to speed up this report generation process. >> >> Can use spark as a caching layer in this case to avoid fetching data from >> oracle over the network every time? I am thinking to submit a spark job for >> each report request and use spark SQL to fetch the data and then process it >> and write to a file? I trying to use kind of data locality in this case. >> >> Whenever a data is updated in oracle tables can I refresh the data in >> spark storage? I can get the update feed using messaging technology. >> >> Can some one from community help me with this? >> Suggestions are welcome. >> >> >> Thanks, >> Prasad >> >> >> >> Thanks, >> Prasad >> > > > -- > Thanks, > Jason >
reporting use case
Hi, I am exploring spark for my Reporting application. My use case is as follows... I have 4-5 oracle tables which contains more than 1.5 billion rows. These tables are updated very frequently every day. I don't have choice to change database technology. So this data is going to remain in Oracle only. To generate 1 report, on an average 15 - 50 million rows has to be fetched from oracle tables. These rows contains some blob columns. Most of the time is spent in fetching these many rows from db over the network. Data processing is not that complex. Currently these report takes around 3-8 hours to complete. I trying to speed up this report generation process. Can use spark as a caching layer in this case to avoid fetching data from oracle over the network every time? I am thinking to submit a spark job for each report request and use spark SQL to fetch the data and then process it and write to a file? I trying to use kind of data locality in this case. Whenever a data is updated in oracle tables can I refresh the data in spark storage? I can get the update feed using messaging technology. Can some one from community help me with this? Suggestions are welcome. Thanks, Prasad Thanks, Prasad
Re: Warning from user@spark.apache.org
Hello, I got a message saying , messages sent to me (my gmail id) from the mailing list got bounced ? Wonder why ? thanks, Prasad. On Mon, Apr 16, 2018 at 6:16 PM, wrote: > Hi! This is the ezmlm program. I'm managing the > user@spark.apache.org mailing list. > > > Messages to you from the user mailing list seem to > have been bouncing. I've attached a copy of the first bounce > message I received. > > If this message bounces too, I will send you a probe. If the probe bounces, > I will remove your address from the user mailing list, > without further notice. > > > I've kept a list of which messages from the user mailing list have > bounced from your address. > > Copies of these messages may be in the archive. > To retrieve a set of messages 123-145 (a maximum of 100 per request), > send a short message to: > > > To receive a subject and author list for the last 100 or so messages, > send a short message to: > > > Here are the message numbers: > >74336 > > --- Enclosed is a copy of the bounce message I received. > > Return-Path: <> > Received: (qmail 55901 invoked for bounce); 6 Apr 2018 23:03:41 - > Date: 6 Apr 2018 23:03:41 - > From: mailer-dae...@apache.org > To: user-return-743...@spark.apache.org > Subject: failure notice > >
Running Hive Beeline .hql file in Spark
Hi , Currently we are running Hive Beeline queries as below. *Beeline :-* beeline -u "jdbc:hive2://localhost:1/default;principal=hive/_HOST@ nsroot.net" --showHeader=false --silent=true --outputformat=dsv --verbose =false -f /home/*sample.hql *> output_partition.txt Note : We run the Hive queries in *sample.hql *and redirect the output in output file output_partition.txt *Spark:* Can anyone tell us how to implement this in *Spark sql* ( ie) Executing the hive.hql file and redirecting the output in one file. Regards Prasad
Running Hive Beeline .hql file in Spark
Hi , Currently we are running Hive Beeline queries as below. *Beeline :-* beeline -u "jdbc:hive2://localhost:1/default;principal=hive/_ h...@nsroot.net" --showHeader=false --silent=true --outputformat=dsv --verbose =false -f /home/*sample.hql *> output_partition.txt Note : We run the Hive queries in *sample.hql *and redirect the output in output file output_partition.txt *Spark:* Can anyone tell us how to implement this in *Spark sql* ( ie) Executing the hive.hql file and redirecting the output in one file. -- ------ Regards, Prasad T
Re: Writing Spark SQL output in Local and HDFS path
Hi, I tried the below code, as result.write.csv(home/Prasad/) It is not working, It says Error: value csv is not member of org.apache.spark.sql.DataFrameWriter. Regards Prasad On Thu, Jan 19, 2017 at 4:35 PM, smartzjp wrote: > Beacause the reduce number will be not one, so it will out put a fold on > the HDFS, You can use “result.write.csv(foldPath)”. > > > > -- > > Hi, > Can anyone please let us know how to write the output of the Spark SQL > in > Local and HDFS path using Scala code. > > *Code :-* > > scala> val result = sqlContext.sql("select empno , name from emp"); > scala > result.show(); > > If I give the command result.show() then It will print the output in the > console. > I need to redirect the output in local file as well as HDFS file. > with the delimiter as "|". > > We tried with the below code > result.saveAsTextFile ("home/Prasad/result.txt") > It is not working as expected. > > > -- > ------ > Prasad. T > -- -- Regards, RAVI PRASAD. T
Writing Spark SQL output in Local and HDFS path
Hi, Can anyone please let us know how to write the output of the Spark SQL in Local and HDFS path using Scala code. *Code :-* scala> val result = sqlContext.sql("select empno , name from emp"); scala > result.show(); If I give the command result.show() then It will print the output in the console. I need to redirect the output in local file as well as HDFS file. with the delimiter as "|". We tried with the below code result.saveAsTextFile ("home/Prasad/result.txt") It is not working as expected. -- ------ Prasad. T
Re: Cant join same dataframe twice ?
Also, check the column names of df1 ( after joining df2 and df3 ). Prasad. From: Ted Yu Date: Monday, April 25, 2016 at 8:35 PM To: Divya Gehlot Cc: "user @spark" Subject: Re: Cant join same dataframe twice ? Can you show us the structure of df2 and df3 ? Thanks On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot mailto:divya.htco...@gmail.com>> wrote: Hi, I am using Spark 1.5.2 . I have a use case where I need to join the same dataframe twice on two different columns. I am getting error missing Columns For instance , val df1 = df2.join(df3,"Column1") Below throwing error missing columns val df 4 = df1.join(df3,"Column2") Is the bug or valid scenario ? Thanks, Divya
DataFrame withColumnRenamed throwing NullPointerException
I am joining two data frames as shown in the code below. This is throwing NullPointerException. I have a number of different join throughout the program and the SparkContext throws this NullPointerException on a randomly on one of the joins. The two data frames are very large data frames ( around 1TB) I am using Spark version 1.5.2. Thanks in advance for any insights. Regards, Prasad. Below is the code. val userAndFmSegment = userData.as("userdata").join(fmSegmentData.withColumnRenamed("USER_ID", "FM_USER_ID").as("fmsegmentdata"), $"userdata.PRIMARY_USER_ID" === $"fmsegmentdata.FM_USER_ID" && $"fmsegmentdata.END_DATE" >= date_sub($"userdata.REPORT_DATE", trailingWeeks * 7) && $"fmsegmentdata.START_DATE" <= date_sub($"userdata.REPORT_DATE", trailingWeeks * 7) , "inner").select( "USER_ID", "PRIMARY_USER_ID", "FM_BUYER_TYPE_CD" ) Log 16/01/05 17:41:19 ERROR ApplicationMaster: User class threw exception: java.lang.NullPointerException java.lang.NullPointerException at org.apache.spark.sql.DataFrame.withColumnRenamed(DataFrame.scala:1161) at DnaAgg$.getUserIdAndFMSegmentId$1(DnaAgg.scala:294) at DnaAgg$.main(DnaAgg.scala:339) at DnaAgg.main(DnaAgg.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
Re: Negative Number of Active Tasks in Spark UI
I am using Spark 1.5.2. I am not using Dynamic allocation. Thanks, Prasad. On 1/5/16, 3:24 AM, "Ted Yu" wrote: >Which version of Spark do you use ? > >This might be related: >https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D8560&d=CwICAg&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=4v0Ji1ymhcVi2Ys2mzOne0cuiDxWMiYmeRYVUeF3hWU&s=9L2ltekpwnC0BDcJPW43_ctNL_G4qTXN4EY2H_Ys0nU&e= > > >Do you use dynamic allocation ? > >Cheers > >> On Jan 4, 2016, at 10:05 PM, Prasad Ravilla wrote: >> >> I am seeing negative active tasks in the Spark UI. >> >> Is anyone seeing this? >> How is this possible? >> >> Thanks, >> Prasad. >> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org
Re: Joining DataFrames - Causing Cartesian Product
Changing equality check from “<=>”to “===“ solved the problem. Performance skyrocketed. I am wondering why “<=>” cause a performance degrade? val dates = new RetailDates() val dataStructures = new DataStructures() // Reading CSV Format input files -- retailDates // This DF has 75 records val retailDatesWithSchema = sqlContext.read .format("com.databricks.spark.csv") .option("delimiter", ",") .schema(dates.retailDatesSchema) .load(datesFile) .coalesce(1) .cache() // Create UDF to convert String to Date val dateUDF: (String => java.sql.Date) = (dateString: String) => new java.sql.Date(customerDateFormat.parse(dateString).getTime()) val stringToDateUDF = udf(dateUDF) // Reading Avro Format Input Files // This DF has 500 million records val userInputDf = sqlContext.read.avro(“customerLocation") val userDf = userInputDf.withColumn("CAL_DT", stringToDateUDF(col("CAL_DT"))).select( "CAL_DT","USER_ID","USER_CNTRY_ID" ) val userDimDf = sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") // This DF has 800 million records val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema) val userDimDfBroadcast = sc.broadcast(userDimDf) val userAndRetailDates = userDnaSdDf .join((retailDatesWithSchemaBroadcast.value).as("retailDates"), userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE") , "inner") val userAndRetailDatesAndUserDim = userAndRetailDates .join((userDimDfBroadcast.value) .withColumnRenamed("USER_ID", "USER_DIM_USER_ID") .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID") .as("userdim") , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID" && userAndRetailDates("USER_CNTRY_ID") <=> $"userdim.USER_DIM_COUNTRY_ID" , "inner") userAndRetailDatesAndUserDim.show() From: Prasad Ravilla Date: Friday, December 18, 2015 at 7:38 AM To: user Subject: Joining DataFrames - Causing Cartesian Product Hi, I am running into performance issue when joining data frames created from avro files using spark-avro library. The data frames are created from 120K avro files and the total size is around 1.5 TB. The two data frames are very huge with billions of records. The join for these two DataFrames runs forever. This process runs on a yarn cluster with 300 executors with 4 executor cores and 8GB memory. Any insights on this join will help. I have posted the explain plan below. I notice a CartesianProduct in the Physical Plan. I am wondering if this is causing the performance issue. Below is the logical plan and the physical plan. ( Due to the confidential nature, I am unable to post any of the column names or the file names here ) == Optimized Logical Plan == Limit 21 Join Inner, [ Join Conditions ] Join Inner, [ Join Conditions ] Project [ List of columns ] Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another large file InMemoryRelation [List of columsn ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None Project [ List of Columns ] Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very large file == Physical Plan == Limit 21 Filter (filter conditions) CartesianProduct Filter (more filter conditions) CartesianProduct Project (selecting a few columns and applying a UDF to one column) Scan AvroRelation[avro file][ columns in Avro File ] InMemoryColumnarTableScan [List of columns ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) Project [ List of Columns ] Scan AvroRelation[Avro File][List of Columns] Code Generation: true Thanks, Prasad.
Joining DataFrames - Causing Cartesian Product
Hi, I am running into performance issue when joining data frames created from avro files using spark-avro library. The data frames are created from 120K avro files and the total size is around 1.5 TB. The two data frames are very huge with billions of records. The join for these two DataFrames runs forever. This process runs on a yarn cluster with 300 executors with 4 executor cores and 8GB memory. Any insights on this join will help. I have posted the explain plan below. I notice a CartesianProduct in the Physical Plan. I am wondering if this is causing the performance issue. Below is the logical plan and the physical plan. ( Due to the confidential nature, I am unable to post any of the column names or the file names here ) == Optimized Logical Plan == Limit 21 Join Inner, [ Join Conditions ] Join Inner, [ Join Conditions ] Project [ List of columns ] Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another large file InMemoryRelation [List of columsn ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None Project [ List of Columns ] Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very large file == Physical Plan == Limit 21 Filter (filter conditions) CartesianProduct Filter (more filter conditions) CartesianProduct Project (selecting a few columns and applying a UDF to one column) Scan AvroRelation[avro file][ columns in Avro File ] InMemoryColumnarTableScan [List of columns ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) Project [ List of Columns ] Scan AvroRelation[Avro File][List of Columns] Code Generation: true Thanks, Prasad.
Re: Large number of conf broadcasts
Thanks, Koert. Regards, Prasad. From: Koert Kuipers Date: Thursday, December 17, 2015 at 1:06 PM To: Prasad Ravilla Cc: Anders Arpteg, user Subject: Re: Large number of conf broadcasts https://github.com/databricks/spark-avro/pull/95<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Davro_pull_95&d=CwMFaQ&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=9AjxHvmieZttugnxWogbT7lOTg1hVM6cMVLj6tfukY4&s=mDYfa3wyqnL6HBitNnJzuriOYqY5e8l7cgMnUgjx96s&e=> On Thu, Dec 17, 2015 at 3:35 PM, Prasad Ravilla mailto:pras...@slalom.com>> wrote: Hi Anders, I am running into the same issue as yours. I am trying to read about 120 thousand avro files into a single data frame. Is your patch part of a pull request from the master branch in github? Thanks, Prasad. From: Anders Arpteg Date: Thursday, October 22, 2015 at 10:37 AM To: Koert Kuipers Cc: user Subject: Re: Large number of conf broadcasts Yes, seems unnecessary. I actually tried patching the com.databricks.spark.avro reader to only broadcast once per dataset, instead of every single file/partition. It seems to work just as fine, and there are significantly less broadcasts and not seeing out of memory issues any more. Strange that more people does not react to this, since the broadcasting seems completely unnecessary... Best, Anders On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers mailto:ko...@tresata.com>> wrote: i am seeing the same thing. its gona completely crazy creating broadcasts for the last 15 mins or so. killing it... On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg mailto:arp...@spotify.com>> wrote: Hi, Running spark 1.5.0 in yarn-client mode, and am curios in why there are so many broadcast being done when loading datasets with large number of partitions/files. Have datasets with thousands of partitions, i.e. hdfs files in the avro folder, and sometime loading hundreds of these large datasets. Believe I have located the broadcast to line SparkContext.scala:1006. It seems to just broadcast the hadoop configuration, and I don't see why it should be necessary to broadcast that for EVERY file? Wouldn't it be possible to reuse the same broadcast configuration? It hardly the case the the configuration would be different between each file in a single dataset. Seems to be wasting lots of memory and needs to persist unnecessarily to disk (see below again). Thanks, Anders 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to disk [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on 10.254.35.24:49428<https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428&d=AAMFaQ&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8&s=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ&e=> (size: 23.1 KB) 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as bytes in memory (estimated size 23.1 KB, free 2.4 KB) 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in memory on 10.254.35.24:49428<https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428&d=AAMFaQ&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8&s=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ&e=> (size: 23.1 KB, free: 464.0 MB) 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from hadoopFile at AvroRelation.scala:121 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_4804 in memory . 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache broadcast_4804 in memory! (computed 496.0 B so far) 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage limit = 530.3 MB. 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk instead. 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with curMem=556036460, maxMem=556038881 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 from memory 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to disk
Re: Large number of conf broadcasts
Hi Anders, I am running into the same issue as yours. I am trying to read about 120 thousand avro files into a single data frame. Is your patch part of a pull request from the master branch in github? Thanks, Prasad. From: Anders Arpteg Date: Thursday, October 22, 2015 at 10:37 AM To: Koert Kuipers Cc: user Subject: Re: Large number of conf broadcasts Yes, seems unnecessary. I actually tried patching the com.databricks.spark.avro reader to only broadcast once per dataset, instead of every single file/partition. It seems to work just as fine, and there are significantly less broadcasts and not seeing out of memory issues any more. Strange that more people does not react to this, since the broadcasting seems completely unnecessary... Best, Anders On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers mailto:ko...@tresata.com>> wrote: i am seeing the same thing. its gona completely crazy creating broadcasts for the last 15 mins or so. killing it... On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg mailto:arp...@spotify.com>> wrote: Hi, Running spark 1.5.0 in yarn-client mode, and am curios in why there are so many broadcast being done when loading datasets with large number of partitions/files. Have datasets with thousands of partitions, i.e. hdfs files in the avro folder, and sometime loading hundreds of these large datasets. Believe I have located the broadcast to line SparkContext.scala:1006. It seems to just broadcast the hadoop configuration, and I don't see why it should be necessary to broadcast that for EVERY file? Wouldn't it be possible to reuse the same broadcast configuration? It hardly the case the the configuration would be different between each file in a single dataset. Seems to be wasting lots of memory and needs to persist unnecessarily to disk (see below again). Thanks, Anders 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to disk [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on 10.254.35.24:49428<https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428&d=AAMFaQ&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8&s=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ&e=> (size: 23.1 KB) 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as bytes in memory (estimated size 23.1 KB, free 2.4 KB) 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in memory on 10.254.35.24:49428<https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428&d=AAMFaQ&c=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4&r=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw&m=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8&s=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ&e=> (size: 23.1 KB, free: 464.0 MB) 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from hadoopFile at AvroRelation.scala:121 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_4804 in memory . 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache broadcast_4804 in memory! (computed 496.0 B so far) 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage limit = 530.3 MB. 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk instead. 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with curMem=556036460, maxMem=556038881 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 from memory 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to disk
Re: spark.authenticate=true YARN mode doesn't work
I did tried. Same problem. as you said earlier. spark.yarn.keytab spark.yarn.principal are required. On Fri, Dec 4, 2015 at 7:25 PM, Ted Yu wrote: > Did you try setting "spark.authenticate.secret" ? > > Cheers > > On Fri, Dec 4, 2015 at 7:07 PM, Prasad Reddy wrote: > >> Hi Ted, >> >> Thank you for the reply. >> >> I am using 1.5.2. >> >> I am implementing SASL encryption. Authentication is required to >> implement SASL Encryption. >> >> I have configured like below in Spark-default.conf >> >> spark.authenticate true >> >> spark.authenticate.enableSaslEncryption true >> >> spark.network.sasl.serverAlwaysEncrypt true >> >> >> Any help will be appreciated. >> >> >> >> Thanks >> >> Prasad >> >> >> >> On Fri, Dec 4, 2015 at 5:55 PM, Ted Yu wrote: >> >>> Which release are you using ? >>> >>> Please take a look at >>> https://spark.apache.org/docs/latest/running-on-yarn.html >>> There're several config parameters related to security: >>> spark.yarn.keytab >>> spark.yarn.principal >>> ... >>> >>> FYI >>> >>> On Fri, Dec 4, 2015 at 5:47 PM, prasadreddy >>> wrote: >>> >>>> Hi All, >>>> >>>> I am running Spark YARN and trying to enable authentication by setting >>>> spark.authenticate=true. After enable authentication I am not able to >>>> Run >>>> Spark word count or any other programs. >>>> >>>> Any help will be appreciated. >>>> >>>> Thanks >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-authenticate-true-YARN-mode-doesn-t-work-tp25573.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
Spark UI keeps redirecting to /null and returns 500
I am having problem in accessing spark UI while running in spark-client mode. It works fine in local mode. It keeps redirecting back to itself by adding /null at the end and ultimately run out of size limit for url and returns 500. Look at response below. I have a feeling that I might be missing some config, I played with various config settings for yarn with no success. I am using spark version 1.3.1 Any help will be greatly appreciated. --2015-09-09 11:22:17-- http://192.168.13.37:4040/ Connecting to 192.168.13.37:4040... connected. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/null/null/null/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ Reusing existing connection to 192.168.13.37:4040. Here is stack dump: 15/09/09 11:22:18 WARN server.Response: Committed before 500 null 15/09/09 11:22:18 WARN server.AbstractHttpConnection: /null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/nu
Spark UI keep redirecting to /null and returns 500
Hi All, I am having problem in accessing spark UI while running in spark-client mode. It works fine in local mode. It keeps redirecting back to itself by adding /null at the end and ultimately run out of size limit for url and returns 500. Look at following below. I have a feeling that I might be missing some config, I played with various config setting for yarn with no success. I am using spark version 1.3.1 Any help will be greatly appreciated. --2015-09-09 11:22:17-- http://192.168.13.37:4040/ Connecting to 192.168.13.37:4040... connected. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/null/null/null/null/ Reusing existing connection to 192.168.13.37:4040. HTTP request sent, awaiting response... HTTP/1.1 302 Found Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ Content-Length: 0 Server: Jetty(8.y.z-SNAPSHOT) Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ [following] --2015-09-09 11:22:17-- http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/ Reusing existing connection to 192.168.13.37:4040. Here is stack dump: 15/09/09 11:22:18 WARN server.Response: Committed before 500 null 15/09/09 11:22:18 WARN server.AbstractHttpConnection: /null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/
RE: Can't access remote Hive table from spark
This happened to me as well, putting hive-site.xml inside conf doesn't seem to work. Instead I added /etc/hive/conf to SPARK_CLASSPATH and it worked. You can try this approach. -Skanda -Original Message- From: "guxiaobo1982" Sent: 25-01-2015 13:50 To: "user@spark.apache.org" Subject: Can't access remote Hive table from spark Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= "This is a test application"; String master="spark://lix1.bh.com:7077"; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); //sqlCtx.sql("LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. List rows = sqlCtx.sql("FROM src SELECT key, value").collect(); System.out.print("I got " + rows.size() + " rows \r\n"); sc.close();} } Exception in thread "main" org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQ
Which version of Hive support Spark & Shark
Hi , Can any one please help me to understand which version of Hive support Spark and Shark -- -- Regards, RAVI PRASAD. T
Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
Hi Wisely, Could you please post your pom.xml here. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p3770.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Unable to read HDFS file -- SimpleApp.java
Check this thread out, http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p2807.html -- you have to remove conflicting akka and protbuf versions. Thanks Prasad. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-read-HDFS-file-SimpleApp-java-tp1813p2853.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
hi, Yes, i did. PARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly Further, when i use the spark-shell, i can read the same file and it works fine. Thanks Prasad. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p2199.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
Hi I am getting the protobuf error while reading HDFS file using spark 0.9.0 -- i am running on hadoop 2.2.0 . When i look thru, i find that i have both 2.4.1 and 2.5 and some blogs suggest that there is some incompatability issues betwen 2.4.1 and 2.5 hduser@prasadHdp1:~/spark-0.9.0-incubating$ find ~/ -name protobuf-java*.jar /home/hduser/.m2/repository/com/google/protobuf/protobuf-java/2.4.1/protobuf-java-2.4.1.jar /home/hduser/.m2/repository/org/spark-project/protobuf/protobuf-java/2.4.1-shaded/protobuf-java-2.4.1-shaded.jar /home/hduser/spark-0.9.0-incubating/lib_managed/bundles/protobuf-java-2.5.0.jar /home/hduser/spark-0.9.0-incubating/lib_managed/jars/protobuf-java-2.4.1-shaded.jar /home/hduser/.ivy2/cache/com.google.protobuf/protobuf-java/bundles/protobuf-java-2.5.0.jar /home/hduser/.ivy2/cache/org.spark-project.protobuf/protobuf-java/jars/protobuf-java-2.4.1-shaded.jar Can someone please let me know if you faced these issues and how u fixed it. Thanks Prasad. Caused by: java.lang.VerifyError: class org.apache.hadoop.security.proto.SecurityProtos$GetDelegationTokenRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2531) at java.lang.Class.privateGetPublicMethods(Class.java:2651) at java.lang.Class.privateGetPublicMethods(Class.java:2661) at java.lang.Class.getMethods(Class.java:1467) at sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426) at sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323) at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636) at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722) at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(ProtobufRpcEngine.java:92) at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158.html Sent from the Apache Spark User List mailing list archive at Nabble.com.