Re: Spark / Scala conflict

2023-11-02 Thread Harry Jamison
Thanks Alonso,
I think this gives me some ideas.

My code is written in Python, and I use spark-submit to submit it.
I am not sure what code is written in scala.  Maybe the Phoenix driver based on 
the stack trace?
How do I tell which version of scala that was compiled against?

Is there a jar that I need to add to the spark or hbase classpath?




On Thursday, November 2, 2023 at 01:38:21 AM PDT, Aironman DirtDiver 
 wrote: 





The error message Caused by: java.lang.ClassNotFoundException: 
scala.Product$class indicates that the Spark job is trying to load a class that 
is not available in the classpath. This can happen if the Spark job is compiled 
with a different version of Scala than the version of Scala that is used to run 
the job.
You have mentioned that you are using Spark 3.5.0, which is compatible with 
Scala 2.12. However, you have also mentioned that you have tried Scala versions 
2.10, 2.11, 2.12, and 2.13. This suggests that you may have multiple versions 
of Scala installed on your system.
To resolve the issue, you need to make sure that the Spark job is compiled and 
run with the same version of Scala. You can do this by setting the 
SPARK_SCALA_VERSION environment variable to the desired Scala version before 
starting the Spark job.
For example, to compile the Spark job with Scala 2.12, you would run the 
following command:
SPARK_SCALA_VERSION=2.12 sbt compile

To run the Spark job with Scala 2.12, you would run the following command:
SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar

If you are using Databricks, you can set the Scala version for the Spark 
cluster in the cluster creation settings.
Once you have ensured that the Spark job is compiled and run with the same 
version of Scala, the error should be resolved.
Here are some additional tips for troubleshooting Scala version conflicts:
* Make sure that you are using the correct version of the Spark libraries. 
The Spark libraries must be compiled with the same version of Scala as the 
Spark job.
* If you are using a third-party library, make sure that it is compatible 
with the version of Scala that you are using.
* Check the Spark logs for any ClassNotFoundExceptions. The logs may 
indicate the specific class that is missing from the classpath.
* Use a tool like sbt dependency:tree to view the dependencies of your 
Spark job. This can help you to identify any conflicting dependencies.

El jue, 2 nov 2023 a las 5:39, Harry Jamison 
() escribió:
> I am getting the error below when I try to run a spark job connecting to 
> phoneix.  It seems like I have the incorrect scala version that some part of 
> the code is expecting.
> 
> I am using spark 3.5.0, and I have copied these phoenix jars into the spark 
> lib
> phoenix-server-hbase-2.5-5.1.3.jar  
> phoenix-spark-5.0.0-HBase-2.0.jar
> 
> I have tried scala 2.10, 2.11, 2.12, and 2.13
> I do not see the scala version used in the logs so I am not 100% sure that it 
> is using the version I expect that it should be.
> 
> 
> Here is the exception that I am getting
> 
> 2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started 
> o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
> Traceback (most recent call last):
>   File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in 
> 
> .option("zkUrl", "namenode:2181").load()
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 
> 314, in load
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
> line 1322, in __call__
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
>  line 179, in deco
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
> 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
> : java.lang.NoClassDefFoundError: scala/Product$class
> at 
> org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
> at 
> org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
> at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.Nati

Spark / Scala conflict

2023-11-01 Thread Harry Jamison
I am getting the error below when I try to run a spark job connecting to 
phoneix.  It seems like I have the incorrect scala version that some part of 
the code is expecting.

I am using spark 3.5.0, and I have copied these phoenix jars into the spark lib
phoenix-server-hbase-2.5-5.1.3.jar  
phoenix-spark-5.0.0-HBase-2.0.jar

I have tried scala 2.10, 2.11, 2.12, and 2.13
I do not see the scala version used in the logs so I am not 100% sure that it 
is using the version I expect that it should be.


Here is the exception that I am getting

2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started 
o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
Traceback (most recent call last):
  File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in 

.option("zkUrl", "namenode:2181").load()
  File "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 314, in load
  File 
"/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
line 1322, in __call__
  File 
"/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
 line 179, in deco
  File "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", 
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
at 
org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 20 more


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pyspark.ml.recommendation is using the wrong python version

2023-09-04 Thread Harry Jamison
 That did not paste well, let me try again

I am using python3.7 and spark 2.4.7
I am trying to figure out why my job is using the wrong python version
This is how it is starting up the logs confirm that I am using python 3.7But I 
later see the error message showing it is trying to us 3.8, and I am not sure 
where it is picking that up.

SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark
Here is my commandsudo --preserve-env -u spark pyspark --deploy-mode client 
--jars 
/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar
  --verbose --py-files pullhttp/base_http_pull.py --master yarn
Python 3.7.17 (default, Jun  6 2023, 20:10:10) 
[GCC 9.4.0] on linux


And when I try to run als.fit on my training data I get this
>>> model = als.fit(training)[Stage 0:>                                         
>>>                  (0 + 1) / 1]23/09/04 21:42:10 WARN 
>>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, 
>>> executor 2): org.apache.spark.SparkException: Error from python worker:  
>>> Traceback (most recent call last):    File "/usr/lib/python3.8/runpy.py", 
>>> line 185, in _run_module_as_main      mod_name, mod_spec, code = 
>>> _get_module_details(mod_name, _Error)    File 
>>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details      
>>> __import__(pkg_name)    File "", line 991, in 
>>> _find_and_load    File "", line 975, in 
>>> _find_and_load_unlocked    File "", line 655, 
>>> in _load_unlocked    File "", line 618, in 
>>> _load_backward_compatible    File "", line 259, in 
>>> load_module    File 
>>> "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/__init__.py",
>>>  line 51, in 



    File 
"/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py",
 line 145, in     File 
"/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py",
 line 126, in _make_cell_set_template_code  TypeError: an integer is required 
(got type bytes)PYTHONPATH was:  
/yarn/nm/usercache/spark/filecache/1130/__spark_libs__3536427065776590449.zip/spark-core_2.11-2.4.7.jar:/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/usr/local/lib/python3.7/dist-packages/pyspark/python/::/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/__pyfiles__:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/py4j-0.10.7-src.ziporg.apache.spark.SparkException:
 No port number in pyspark.daemon's stdout

On Monday, September 4, 2023 at 10:08:56 PM PDT, Harry Jamison 
 wrote:  
 
 I am using python3.7 and spark 2.4.7
I am trying to figure out why my job is using the wrong python version
This is how it is starting up the logs confirm that I am using python 3.7But I 
later see the error message showing it is trying to us 3.8, and I am not sure 
where it is picking that up.

SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark
Here is my command
sudo --preserve-env -u spark pyspark --deploy-mode client --jars 
/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar
  --verbose --py-files pullhttp/base_http_pull.py --master yarn

Python 3.7.17 (default, Jun  6 2023, 20:10:10) 


[GCC 9.4.0] on linux


And when I try to run als.fit on my training data I get this
>>> model = als.fit(training)[Stage 0:>                                         
>>>                  (0 + 1) / 1]23/09/04 21:42:10 WARN 
>>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, 
>>> executor 2): org.apache.spark.SparkException: Error from python worker:  
>>> Traceback (most recent call last):    File "/usr/lib/python3.8/runpy.py", 
>>> line 185, in _run_module_as_main      mod_name, mod_spec, code = 
>>> _get_module_details(mod_name, _Error)    File 
>>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details      
>>> __import__(pkg_name)    File "", line 991, in 
>>> _find_and_load    File "", line 975, in 
>>> _find_and_load_unlocked    File "", line 655, 
>>> in _load_unlocked    File "", line 618, in 
>>> _load_backward_compatible    File &q

pyspark.ml.recommendation is using the wrong python version

2023-09-04 Thread Harry Jamison
I am using python3.7 and spark 2.4.7
I am trying to figure out why my job is using the wrong python version
This is how it is starting up the logs confirm that I am using python 3.7But I 
later see the error message showing it is trying to us 3.8, and I am not sure 
where it is picking that up.

SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark
Here is my command
sudo --preserve-env -u spark pyspark --deploy-mode client --jars 
/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar
  --verbose --py-files pullhttp/base_http_pull.py --master yarn

Python 3.7.17 (default, Jun  6 2023, 20:10:10) 


[GCC 9.4.0] on linux


And when I try to run als.fit on my training data I get this
>>> model = als.fit(training)[Stage 0:>                                         
>>>                  (0 + 1) / 1]23/09/04 21:42:10 WARN 
>>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, 
>>> executor 2): org.apache.spark.SparkException: Error from python worker:  
>>> Traceback (most recent call last):    File "/usr/lib/python3.8/runpy.py", 
>>> line 185, in _run_module_as_main      mod_name, mod_spec, code = 
>>> _get_module_details(mod_name, _Error)    File 
>>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details      
>>> __import__(pkg_name)    File "", line 991, in 
>>> _find_and_load    File "", line 975, in 
>>> _find_and_load_unlocked    File "", line 655, 
>>> in _load_unlocked    File "", line 618, in 
>>> _load_backward_compatible    File "", line 259, in 
>>> load_module    File 
>>> "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/__init__.py",
>>>  line 51, in 



    File 
"/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py",
 line 145, in     File 
"/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py",
 line 126, in _make_cell_set_template_code  TypeError: an integer is required 
(got type bytes)PYTHONPATH was:  
/yarn/nm/usercache/spark/filecache/1130/__spark_libs__3536427065776590449.zip/spark-core_2.11-2.4.7.jar:/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/usr/local/lib/python3.7/dist-packages/pyspark/python/::/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/__pyfiles__:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/py4j-0.10.7-src.ziporg.apache.spark.SparkException:
 No port number in pyspark.daemon's stdout



Re: Spark 2.4.7

2023-08-26 Thread Harry Jamison
 
Thank you Varun, this makes sense.
I understand a separate process for content ingestion. I was thinking it would 
be a separate spark job, but it sounds like you are suggesting that ideally I 
should do it outside of Hadoop entirely?
Thanks

Harry



On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah 
 wrote:  
 
 Hi Harry, 

Ideally, you should not be fetching a url in your transformation job but do the 
API calls separately (outside the cluster if possible). Ingesting data should 
be treated separately from transformation / cleaning / join operations. You can 
create another dataframe of urls, dedup if required & store it in a file where 
your normal python function would ingest the data for the url & after X amount 
of api calls, create dataframe for it & union with previous dataframe, finally 
writing the content & then doing a join with the original df based on url, if 
required.

If this is absolutely necessary, here are a few ways to achieve this:

Approach-1:
You can use the spark's foreachPartition which will require a udf function.In 
this, you can create a connection to limit the API calls per partition. 

This can work if you introduce logic that checks for the current number of 
partitions & then distribute the max_api_calls per partition.eg: if 
no_of_partitions = 4 and total_max_api_calls = 4, then you can pass in a 
parameter to this udf with max_partition_api_calls = 1. 

This approach has limitations as it requires max allowed api calls to be more 
than that of the number of partitions.
Approach-2
An alternative approach is to create the connection outside of the udf with 
rate limiter(link) and use this connection variable inside of the udf function 
in each partition, invoking time.sleep. This will definitely introduce issues 
where many partitions are trying to invoke the api.
I found this medium-article which discusses the issue you are facing, but does 
not discuss a solution for the same. Do check the comments also 

Regards,Varun


On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison 
 wrote:

I am using python 3.7 and Spark 2.4.7
I am not sure what the best way to do this is.
I have a dataframe with a url in one of the columns, and I want to download the 
contents of that url and put it in a new column.
Can someone point me in the right direction on how to do this?I looked at the 
UDFs and they seem confusing to me.
Also, is there a good way to rate limit the number of calls I make per second?
  

Spark 2.4.7

2023-08-25 Thread Harry Jamison
I am using python 3.7 and Spark 2.4.7
I am not sure what the best way to do this is.
I have a dataframe with a url in one of the columns, and I want to download the 
contents of that url and put it in a new column.
Can someone point me in the right direction on how to do this?I looked at the 
UDFs and they seem confusing to me.
Also, is there a good way to rate limit the number of calls I make per second?