Re: Spark / Scala conflict
Thanks Alonso, I think this gives me some ideas. My code is written in Python, and I use spark-submit to submit it. I am not sure what code is written in scala. Maybe the Phoenix driver based on the stack trace? How do I tell which version of scala that was compiled against? Is there a jar that I need to add to the spark or hbase classpath? On Thursday, November 2, 2023 at 01:38:21 AM PDT, Aironman DirtDiver wrote: The error message Caused by: java.lang.ClassNotFoundException: scala.Product$class indicates that the Spark job is trying to load a class that is not available in the classpath. This can happen if the Spark job is compiled with a different version of Scala than the version of Scala that is used to run the job. You have mentioned that you are using Spark 3.5.0, which is compatible with Scala 2.12. However, you have also mentioned that you have tried Scala versions 2.10, 2.11, 2.12, and 2.13. This suggests that you may have multiple versions of Scala installed on your system. To resolve the issue, you need to make sure that the Spark job is compiled and run with the same version of Scala. You can do this by setting the SPARK_SCALA_VERSION environment variable to the desired Scala version before starting the Spark job. For example, to compile the Spark job with Scala 2.12, you would run the following command: SPARK_SCALA_VERSION=2.12 sbt compile To run the Spark job with Scala 2.12, you would run the following command: SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar If you are using Databricks, you can set the Scala version for the Spark cluster in the cluster creation settings. Once you have ensured that the Spark job is compiled and run with the same version of Scala, the error should be resolved. Here are some additional tips for troubleshooting Scala version conflicts: * Make sure that you are using the correct version of the Spark libraries. The Spark libraries must be compiled with the same version of Scala as the Spark job. * If you are using a third-party library, make sure that it is compatible with the version of Scala that you are using. * Check the Spark logs for any ClassNotFoundExceptions. The logs may indicate the specific class that is missing from the classpath. * Use a tool like sbt dependency:tree to view the dependencies of your Spark job. This can help you to identify any conflicting dependencies. El jue, 2 nov 2023 a las 5:39, Harry Jamison () escribió: > I am getting the error below when I try to run a spark job connecting to > phoneix. It seems like I have the incorrect scala version that some part of > the code is expecting. > > I am using spark 3.5.0, and I have copied these phoenix jars into the spark > lib > phoenix-server-hbase-2.5-5.1.3.jar > phoenix-spark-5.0.0-HBase-2.0.jar > > I have tried scala 2.10, 2.11, 2.12, and 2.13 > I do not see the scala version used in the logs so I am not 100% sure that it > is using the version I expect that it should be. > > > Here is the exception that I am getting > > 2023-11-01T16:13:00,391 INFO [Thread-4] handler.ContextHandler: Started > o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark} > Traceback (most recent call last): > File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in > > .option("zkUrl", "namenode:2181").load() > File > "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line > 314, in load > File > "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File > "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", > line 179, in deco > File > "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line > 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. > : java.lang.NoClassDefFoundError: scala/Product$class > at > org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29) > at > org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) > at > org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) > at scala.Option.getOrElse(Option.scala:189) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.Nati
Spark / Scala conflict
I am getting the error below when I try to run a spark job connecting to phoneix. It seems like I have the incorrect scala version that some part of the code is expecting. I am using spark 3.5.0, and I have copied these phoenix jars into the spark lib phoenix-server-hbase-2.5-5.1.3.jar phoenix-spark-5.0.0-HBase-2.0.jar I have tried scala 2.10, 2.11, 2.12, and 2.13 I do not see the scala version used in the logs so I am not 100% sure that it is using the version I expect that it should be. Here is the exception that I am getting 2023-11-01T16:13:00,391 INFO [Thread-4] handler.ContextHandler: Started o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark} Traceback (most recent call last): File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in .option("zkUrl", "namenode:2181").load() File "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 314, in load File "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco File "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. : java.lang.NoClassDefFoundError: scala/Product$class at org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29) at org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: scala.Product$class at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) ... 20 more - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: pyspark.ml.recommendation is using the wrong python version
That did not paste well, let me try again I am using python3.7 and spark 2.4.7 I am trying to figure out why my job is using the wrong python version This is how it is starting up the logs confirm that I am using python 3.7But I later see the error message showing it is trying to us 3.8, and I am not sure where it is picking that up. SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark Here is my commandsudo --preserve-env -u spark pyspark --deploy-mode client --jars /opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar --verbose --py-files pullhttp/base_http_pull.py --master yarn Python 3.7.17 (default, Jun 6 2023, 20:10:10) [GCC 9.4.0] on linux And when I try to run als.fit on my training data I get this >>> model = als.fit(training)[Stage 0:> >>> (0 + 1) / 1]23/09/04 21:42:10 WARN >>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, >>> executor 2): org.apache.spark.SparkException: Error from python worker: >>> Traceback (most recent call last): File "/usr/lib/python3.8/runpy.py", >>> line 185, in _run_module_as_main mod_name, mod_spec, code = >>> _get_module_details(mod_name, _Error) File >>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details >>> __import__(pkg_name) File "", line 991, in >>> _find_and_load File "", line 975, in >>> _find_and_load_unlocked File "", line 655, >>> in _load_unlocked File "", line 618, in >>> _load_backward_compatible File "", line 259, in >>> load_module File >>> "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/__init__.py", >>> line 51, in File "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py", line 145, in File "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py", line 126, in _make_cell_set_template_code TypeError: an integer is required (got type bytes)PYTHONPATH was: /yarn/nm/usercache/spark/filecache/1130/__spark_libs__3536427065776590449.zip/spark-core_2.11-2.4.7.jar:/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/usr/local/lib/python3.7/dist-packages/pyspark/python/::/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/__pyfiles__:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/py4j-0.10.7-src.ziporg.apache.spark.SparkException: No port number in pyspark.daemon's stdout On Monday, September 4, 2023 at 10:08:56 PM PDT, Harry Jamison wrote: I am using python3.7 and spark 2.4.7 I am trying to figure out why my job is using the wrong python version This is how it is starting up the logs confirm that I am using python 3.7But I later see the error message showing it is trying to us 3.8, and I am not sure where it is picking that up. SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark Here is my command sudo --preserve-env -u spark pyspark --deploy-mode client --jars /opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar --verbose --py-files pullhttp/base_http_pull.py --master yarn Python 3.7.17 (default, Jun 6 2023, 20:10:10) [GCC 9.4.0] on linux And when I try to run als.fit on my training data I get this >>> model = als.fit(training)[Stage 0:> >>> (0 + 1) / 1]23/09/04 21:42:10 WARN >>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, >>> executor 2): org.apache.spark.SparkException: Error from python worker: >>> Traceback (most recent call last): File "/usr/lib/python3.8/runpy.py", >>> line 185, in _run_module_as_main mod_name, mod_spec, code = >>> _get_module_details(mod_name, _Error) File >>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details >>> __import__(pkg_name) File "", line 991, in >>> _find_and_load File "", line 975, in >>> _find_and_load_unlocked File "", line 655, >>> in _load_unlocked File "", line 618, in >>> _load_backward_compatible File &q
pyspark.ml.recommendation is using the wrong python version
I am using python3.7 and spark 2.4.7 I am trying to figure out why my job is using the wrong python version This is how it is starting up the logs confirm that I am using python 3.7But I later see the error message showing it is trying to us 3.8, and I am not sure where it is picking that up. SPARK_HOME = /usr/local/lib/python3.7/dist-packages/pyspark Here is my command sudo --preserve-env -u spark pyspark --deploy-mode client --jars /opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p0.15945976/jars/phoenix5-spark-shaded-6.0.0.7.1.7.0-551.jar --verbose --py-files pullhttp/base_http_pull.py --master yarn Python 3.7.17 (default, Jun 6 2023, 20:10:10) [GCC 9.4.0] on linux And when I try to run als.fit on my training data I get this >>> model = als.fit(training)[Stage 0:> >>> (0 + 1) / 1]23/09/04 21:42:10 WARN >>> scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, datanode1, >>> executor 2): org.apache.spark.SparkException: Error from python worker: >>> Traceback (most recent call last): File "/usr/lib/python3.8/runpy.py", >>> line 185, in _run_module_as_main mod_name, mod_spec, code = >>> _get_module_details(mod_name, _Error) File >>> "/usr/lib/python3.8/runpy.py", line 111, in _get_module_details >>> __import__(pkg_name) File "", line 991, in >>> _find_and_load File "", line 975, in >>> _find_and_load_unlocked File "", line 655, >>> in _load_unlocked File "", line 618, in >>> _load_backward_compatible File "", line 259, in >>> load_module File >>> "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/__init__.py", >>> line 51, in File "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py", line 145, in File "/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip/pyspark/cloudpickle.py", line 126, in _make_cell_set_template_code TypeError: an integer is required (got type bytes)PYTHONPATH was: /yarn/nm/usercache/spark/filecache/1130/__spark_libs__3536427065776590449.zip/spark-core_2.11-2.4.7.jar:/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/usr/local/lib/python3.7/dist-packages/pyspark/python/::/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/__pyfiles__:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/pyspark.zip:/yarn/nm/usercache/spark/appcache/application_1693107150164_0198/container_e03_1693107150164_0198_01_03/py4j-0.10.7-src.ziporg.apache.spark.SparkException: No port number in pyspark.daemon's stdout
Re: Spark 2.4.7
Thank you Varun, this makes sense. I understand a separate process for content ingestion. I was thinking it would be a separate spark job, but it sounds like you are suggesting that ideally I should do it outside of Hadoop entirely? Thanks Harry On Saturday, August 26, 2023 at 09:19:33 AM PDT, Varun Shah wrote: Hi Harry, Ideally, you should not be fetching a url in your transformation job but do the API calls separately (outside the cluster if possible). Ingesting data should be treated separately from transformation / cleaning / join operations. You can create another dataframe of urls, dedup if required & store it in a file where your normal python function would ingest the data for the url & after X amount of api calls, create dataframe for it & union with previous dataframe, finally writing the content & then doing a join with the original df based on url, if required. If this is absolutely necessary, here are a few ways to achieve this: Approach-1: You can use the spark's foreachPartition which will require a udf function.In this, you can create a connection to limit the API calls per partition. This can work if you introduce logic that checks for the current number of partitions & then distribute the max_api_calls per partition.eg: if no_of_partitions = 4 and total_max_api_calls = 4, then you can pass in a parameter to this udf with max_partition_api_calls = 1. This approach has limitations as it requires max allowed api calls to be more than that of the number of partitions. Approach-2 An alternative approach is to create the connection outside of the udf with rate limiter(link) and use this connection variable inside of the udf function in each partition, invoking time.sleep. This will definitely introduce issues where many partitions are trying to invoke the api. I found this medium-article which discusses the issue you are facing, but does not discuss a solution for the same. Do check the comments also Regards,Varun On Sat, Aug 26, 2023 at 10:32 AM Harry Jamison wrote: I am using python 3.7 and Spark 2.4.7 I am not sure what the best way to do this is. I have a dataframe with a url in one of the columns, and I want to download the contents of that url and put it in a new column. Can someone point me in the right direction on how to do this?I looked at the UDFs and they seem confusing to me. Also, is there a good way to rate limit the number of calls I make per second?
Spark 2.4.7
I am using python 3.7 and Spark 2.4.7 I am not sure what the best way to do this is. I have a dataframe with a url in one of the columns, and I want to download the contents of that url and put it in a new column. Can someone point me in the right direction on how to do this?I looked at the UDFs and they seem confusing to me. Also, is there a good way to rate limit the number of calls I make per second?