Re: Unable to use scala function in pyspark

2021-09-26 Thread Sean Owen
You can also call a Scala UDF from Python in Spark - this doesn't need
Zeppelin or relate to the front-end.
This may indeed be much easier as a proper UDF; depends on what this
function does.
However I think the issue may be that you're trying to wrap the resulting
DataFrame in a DataFrame or something. First inspect what you get back from
the invocation of the Scala method.


On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang  wrote:

> Hi kumar,
>
> You can try Zeppelin which support the udf sharing across languages
>
> http://zeppelin.apache.org/
>
>
>
>
> rahul kumar  于2021年9月27日周一 上午4:20写道:
>
>> I'm trying to use a function defined in scala jar in pyspark ( spark
>> 3.0.2).
>>
>> --scala ---
>>
>> Object PythonUtil {
>>
>> def customedf(dataFrame: DataFrame,
>>  keyCol: String,
>>  table: String,
>>  outputSchema: StructType,
>>  database: String): DataFrame = {
>>
>> // some transformation of dataframe and convert as per the output schema
>> types and fields.
>> ...
>> resultDF
>> }
>>
>> //In jupyter notebook
>> schema creation:
>> alias = StructType([StructField("first_name", StringType(),
>> False),StructField("last_name", StringType(), False)])
>> name = StructType([StructField("first_name", StringType(),
>> False),StructField("aliases", ArrayType(alias), False)])
>> street_adress = StructType([StructField("street_name", StringType(),
>> False),StructField("apt_number", IntegerType(), False)])
>> address = StructType([StructField("zip", LongType(),
>> False),StructField("street", street_adress, False),StructField("city",
>> StringType(), False)])
>> workHistory = StructType([StructField("company_name", StringType(),
>> False),StructField("company_address", address,
>> False),StructField("worked_from", StringType(), False)])
>>
>> //passing this to scala function.
>> outputschema= StructType([StructField("name", name,
>> False),StructField("SSN", StringType(), False),StructField("home_address",
>> ArrayType(address), False)])
>>
>> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
>> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
>> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>>
>> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
>> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
>> 'table', outputschema, 'test'), spark._wrapped)
>>
>> Then I get an error that AttributeError: 'StructField' object has no
>> attribute '_get_object_id'
>>
>> full stacktrace
>>
>> ---
>> AttributeErrorTraceback (most recent call
>> last)
>>  in 
>>   4
>>   5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
>> > 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
>> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1258 def _build_args(self, *args):
>>1259 if self.converters is not None and len(self.converters) >
>> 0:
>> -> 1260 (new_args, temp_args) = self._get_args(args)
>>1261 else:
>>1262 new_args = args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _get_args(self, args)
>>1245 for converter in self.gateway_client.converters:
>>1246 if converter.can_convert(arg):
>> -> 1247 temp_arg = converter.convert(arg,
>> self.gateway_client)
>>1248 temp_args.append(temp_arg)
>>1249 new_args.append(temp_arg)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
>> in convert(self, object, gateway_client)
>> 509 java_list = ArrayList()
>> 510 for element in object:
>> --> 511 java_list.add(element)
>> 512 return java_list
>> 513
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>1294
>>1295 def __call__(self, *args):
>> -> 1296 args_command, temp_args = self._build_args(*args)
>>1297
>>1298 command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>1264
>>1265 args_command = "".join(
>> -> 1266 [get_command_part(arg, self.pool) for arg in
>> new_args])
>>1267
>>1268 return args_command, temp_args
>>

Re: Spark DStream application memory leak debugging

2021-09-25 Thread Sean Owen
It could be 'normal' - executors won't GC unless they need to.
It could be state in your application, if you're storing state.
You'd want to dump the heap to take a first look

On Sat, Sep 25, 2021 at 7:24 AM Kiran Biswal  wrote:

> Hello Experts
>
> I have a spark streaming application(DStream). I use spark 3.0.2, scala
> 2.12 This application reads about 20 different kafka topics and produces a
> single stream and I filter the RDD per topic and store in cassandra
>
> I see that there is a steady increase in executor memory over the hours
> until it reaches max allocated memory and then it stays  at that value. No
> matter how high I allocate to the executor this pattern is seen. I suspect
> memory leak
>
> Any guidance you may be able provide as to how to debug will be highly
> appreciated
>
> Thanks in advance
> Regards
> Kiran
>


Re: SparkDF writing null values when not to database

2021-09-22 Thread Sean Owen
What is null, what is the type, does it make sense in postgres, etc. Need
more info.

On Wed, Sep 22, 2021 at 9:18 AM Aram Morcecian  wrote:

> Hi everyone, I'm facing something weird. After doing some transformations
> to a SparkDF I print some rows and I see those perfectly, but when I write
> that to the database (postgres) one of the transformed columns is written
> with all null values in it.
> I tried doing it with DynamicFrames (AWS Glue) and it works ok, but since
> I need to overwrite the table that I'm writing on, I need to use SparkDF.
>
> I'm attaching the code that I'm using to do this. Could someone help
> me with this?
>
> Thank you
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Does Apache Spark 3 support GPU usage for Spark RDDs?

2021-09-21 Thread Sean Owen
spark-rapids is not part of Spark, so couldn't speak to it, but Spark
itself does not use GPUs at all.
It does let you configure a task to request a certain number of GPUs, and
that would work for RDDs, but it's up to the code being executed to use the
GPUs.

On Tue, Sep 21, 2021 at 1:23 PM Abhishek Shakya 
wrote:

>
> Hi,
>
> I am currently trying to run genomic analyses pipelines using Hail(library
> for genomics analyses written in python and Scala). Recently, Apache Spark
> 3 was released and it supported GPU usage.
>
> I tried spark-rapids library to start an on-premise slurm cluster with gpu
> nodes. I was able to initialise the cluster. However, when I tried running
> hail tasks, the executors kept getting killed.
>
> On querying in Hail forum, I got the response that
>
> That’s a GPU code generator for Spark-SQL, and Hail doesn’t use any
> Spark-SQL interfaces, only the RDD interfaces.
> So, does Spark3 not support GPU usage for RDD interfaces?
>
>
> PS: The question is posted in stackoverflow as well: Link
> 
>
>
> Regards,
> -
>
> Abhishek Shakya
> Senior Data Scientist 1,
> Contact: +919002319890 | Email ID: abhishek.sha...@aganitha.ai
> Aganitha Cognitive Solutions 
>


Re: [apache-spark][Spark SQL][Debug] Maven Spark build fails while compiling spark-hive-thriftserver_2.12 for Hadoop 2.10.1

2021-09-17 Thread Sean Owen
I don't think that has ever showed up in the CI/CD builds and can't recall
someone reporting this. What did you change? it may be some local env issue

On Fri, Sep 17, 2021 at 7:09 AM Enrico Minardi 
wrote:

>
> Hello,
>
>
> the Maven build of Apache Spark 3.1.2 for user-provided Hadoop 2.10.1 with
> Hive and Hive-Thriftserver profiles fails while compiling
> spark-hive-thriftserver_2.12.
>
>
> I am most probably missing something. Could you please help?
>
>
> I have searched the Scala-Maven-Plugin website (
> https://davidb.github.io/scala-maven-plugin/usage.html), Stack Overflow /
> Stack Exchange, and the ASF user-list archive, but could not progress on
> this issue.
>
>
> The Maven version is 3.6.3, Scala version is the 2.12 (not installed, as
> mentioned here: https://davidb.github.io/scala-maven-plugin/index.html).
>
>
> Thank you very much for any suggestion. I would be very appreciated.
>
>
> Kind regards,
>
> Enrico
>
> --
>
>
> The error message is:
>
>
> Failed to execute goal
> [32mnet.alchim31.maven:scala-maven-plugin:4.3.0:compile [m
> [1m(scala-compile-first) [m on project [36mspark-hive-thriftserver_2.12 [m:
> [1;31mExecution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:4.3.0:compile failed:
> java.lang.AssertionError: assertion failed: Expected protocol to be 'file'
> or empty in URI
> jar:file:/home/vmuser/.m2/repository/org/eclipse/jetty/jetty-server/9.4.40.v20210413/jetty-server-9.4.40.v20210413.jar!/org/eclipse/jetty/server/AbstractConnectionFactory.class
>
>
> I have adjusted the ./pom.xml as follows:
>
>
> 
>   hadoop-provided
>   
> 2.10.1
> 2.13.0
> 2.4
> servlet-api
>   
> (...)
>
>
> The command I run is:
>
>
> ./dev/make-distribution.sh -X -q -Phadoop-provided -Pyarn -Pkubernetes
> -Pscala-2.12 -Phive -Phive-thriftserver -DzincPort=3036
>
>


Re: Lock issue with SQLConf.getConf

2021-09-11 Thread Sean Owen
Looks like this was improved in
https://issues.apache.org/jira/browse/SPARK-35701 for 3.2.0

On Fri, Sep 10, 2021 at 10:21 PM Kohki Nishio  wrote:

> Hello,
> I'm running spark in local mode and seeing multiple threads showing like
> below, anybody knows why it's not using a concurrent hash map  ?
>
> ---
> "request-handler-dispatcher-19785" #107532 prio=5 os_prio=0
> tid=0x7fbd78036000 nid=0x4ebf runnable [0x7fc6e83af000]
>java.lang.Thread.State: RUNNABLE
> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
> - locked <0x7fc901c7d9f8> (a java.util.Collections$SynchronizedMap)
> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
> at
> org.apache.spark.sql.internal.SQLConf.constraintPropagationEnabled(SQLConf.scala:3346)
>
> "Executor task launch worker for task 23.0 in stage 279783.0 (TID 449746)"
> #107929 daemon prio=5 os_prio=0 tid=0x7fbe2c005000 nid=0x55f6 waiting
> for monitor entry [0x7fc6e4037000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
> - waiting to lock <0x7fc901c7d9f8> (a
> java.util.Collections$SynchronizedMap)
> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
> at
> org.apache.spark.sql.internal.SQLConf.methodSplitThreshold(SQLConf.scala:3330)
>
> "Executor task launch worker for task 16.0 in stage 279883.0 (TID 449728)"
> #107690 daemon prio=5 os_prio=0 tid=0x7fbc60056800 nid=0x53c6 waiting
> for monitor entry [0x7fc65a3a8000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
> - waiting to lock <0x7fc901c7d9f8> (a
> java.util.Collections$SynchronizedMap)
> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
> at
> org.apache.spark.sql.internal.SQLConf.planChangeLogLevel(SQLConf.scala:3160)
> at
> org.apache.spark.sql.catalyst.rules.PlanChangeLogger.(RuleExecutor.scala:49)
>
> ---
>
>
> --
> Kohki Nishio
>


Re: issue in Apache Spark install

2021-09-09 Thread Sean Owen
- other lists, please don't cross post to 4 lists (!)

This is a problem you'd see with Java 9 or later - I assume you're running
that under the hood. However it should be handled by Spark in the case that
you can't access certain things in Java 9+, and this may be a bug I'll look
into. In the meantime you could try adding "--add-opens
java.base/java.lang=ALL-UNNAMED" to the Java command line if you can, or
try to use Java 8, as a temporary workaround, to see if that helps.


On Wed, Sep 8, 2021 at 6:50 AM Mukhtar Ali  wrote:

> Dear
>
> Learning member of  of https://learning.oreilly.com
> some problem in install Apache Spark
> I try both CMD and Jupyter file
> same issue* Exception: Java gateway process exited before sending its
> port number*
> please resolve this issue
> find the attachment in Jupyter
>
>
> In CMD
> C:\Users\User>pyspark
> Python 3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)]
> :: Anaconda, Inc. on win32
>
> Warning:
> This Python interpreter is in a conda environment, but the environment has
> not been activated.  Libraries may fail to load.  To activate this
> environment
> please see https://conda.io/activation
>
> Type "help", "copyright", "credits" or "license" for more information.
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at
> org.apache.spark.unsafe.array.ByteArrayMethods.(ByteArrayMethods.java:54)
> at
> org.apache.spark.internal.config.package$.(package.scala:1095)
> at
> org.apache.spark.internal.config.package$.(package.scala)
> at
> org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157)
> at scala.Option.orElse(Option.scala:447)
> at
> org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157)
> at
> org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:115)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.(SparkSubmit.scala:1022)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1022)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
> private java.nio.DirectByteBuffer(long,int) accessible: module java.base
> does not "opens java.nio" to unnamed module @71e9ddb4
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
> at
> java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188)
> at
> java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181)
> at org.apache.spark.unsafe.Platform.(Platform.java:56)
> ... 13 more
> Traceback (most recent call last):
>   File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\shell.py", line
> 35, in 
> SparkContext._ensure_initialized()  # type: ignore
>   File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\context.py",
> line 331, in _ensure_initialized
> SparkContext._gateway = gateway or launch_gateway(conf)
>   File
> "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\java_gateway.py", line
> 108, in launch_gateway
> raise Exception("Java gateway process exited before sending its port
> number")
> Exception: Java gateway process exited before sending its port number
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: JavaSerializerInstance is slow

2021-09-03 Thread Sean Owen
I don't know if java serialization is slow in that case; that shows
blocking on a class load, which may or may not be directly due to
deserialization.
Indeed I don't think (some) things are serialized in local mode within one
JVM, so not sure that's actually what's going on.

On Thu, Sep 2, 2021 at 11:58 PM Antonin Delpeuch (lists) <
li...@antonin.delpeuch.eu> wrote:

> Hi Kohki,
>
> Serialization of tasks happens in local mode too and as far as I am
> aware there is no way to disable this (although it would definitely be
> useful in my opinion).
>
> You can see the local mode as a testing mode, in which you would want to
> catch any serialization errors, before they appear in production.
>
> There are also some important bugs that are present in local mode and
> are not deemed worth fixing because it is not intended to be used in
> production (https://issues.apache.org/jira/browse/SPARK-5300).
>
> I think there would definitely be interest in having a reliable and
> efficient local mode in Spark but it's a pretty different use case than
> what Spark originally focused on.
>
> Antonin
>
> On 03/09/2021 05:56, Kohki Nishio wrote:
> > I'm seeing many threads doing deserialization of a task, I understand
> > since lambda is involved, we can't use Kryo for those purposes.
> > However I'm running it in local mode, this serialization is not really
> > necessary, no?
> >
> > Is there any trick I can apply to get rid of this thread contention ?
> > I'm seeing many of the below threads in thread dumps ...
> >
> >
> > "Executor task launch worker for task 11.0 in stage 15472514.0 (TID
> > 19788863)" #732821 daemon prio=5 os_prio=0 tid=0x7f02581b2800
> > nid=0x355d waiting for monitor entry [0x7effd1e3f000]
> >java.lang.Thread.State: BLOCKED (on object monitor)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:400)
> > - waiting to lock <0x7f0f7246edf8> (a java.lang.Object)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> > at
> >
> scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:51)
> > at
> >
> scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
> >
> >
> > Thanks
> > -Kohki
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Processing Multiple Streams in a Single Job

2021-08-27 Thread Sean Owen
That is something else. Yes, you can create a single, complex stream job
that joins different data sources, etc. That is not different than any
other Spark usage. What are you looking for w.r.t. docs?

We are also saying you can simply run N unrelated streaming jobs in
parallel on the driver, which is a few lines of code, but, not specific to
Spark.

Which one you want depends on what you are trying to do, which I'm not
clear on.

On Fri, Aug 27, 2021 at 9:12 AM Artemis User  wrote:

> Thanks Mich.  I understand now how to deal multiple streams in a single
> job, but the responses I got before were very abstract and confusing.  So I
> had to go back to the Spark doc and figure out the details.  This is what I
> found out:
>
>1. The standard and recommended way to do multi-stream processing in
>Structured Streaming (not DStream) is to use the join operations.  No need
>to use collections and mapping functions (I guess these could be for
>DStream).  Once the you have created the combined/joined DF, you use that
>DF's stream writer to process each microbatch or event data before dumping
>results to the output sink (the official Spark doc on this isn't very clear
>and the coding examples were not complete,
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations
>)
>2. The semantics on multi-stream processing is really mind-boggling.
>You have to clearly define inner and outer join along with watermarks
>conditions.  We are still in the process of detailing our use cases since
>we need to process three streams simultaneously in near real-time, and we
>don't want to have any blocking situations (i.e. one stream source doesn't
>produce any data for a considerable time period).  If you or anyone have
>any suggestions, I'd appreciate your comments..
>
> Thanks!  -- ND
>
> On 8/26/21 2:38 AM, Mich Talebzadeh wrote:
>
> Hi ND,
>
> Within the same Spark job you can handle two topics simultaneously SSS. Is
> that what you are implying?
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 24 Aug 2021 at 23:37, Artemis User  wrote:
>
>> Is there a way to run multiple streams in a single Spark job using
>> Structured Streaming?  If not, is there an easy way to perform inter-job
>> communications (e.g. referencing a dataframe among concurrent jobs) in
>> Spark?  Thanks a lot in advance!
>>
>> -- ND
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Processing Multiple Streams in a Single Job

2021-08-25 Thread Sean Owen
This part isn't Spark specific, just a matter of running code in parallel
on the driver (that happens to start streaming jobs). In Scala it's things
like .par collections, in Python it's something like multiprocessing.

On Wed, Aug 25, 2021 at 8:48 AM Artemis User  wrote:

> Thanks Sean.  Excuse my ignorant, but I just can't figure out how to
> create a collection across multiple streams using multiple stream readers.
> Could you provide some examples or additional references?  Thanks!
>
> On 8/24/21 11:01 PM, Sean Owen wrote:
>
> No, that applies to the streaming DataFrame API too.
> No jobs can't communicate with each other.
>
> On Tue, Aug 24, 2021 at 9:51 PM Artemis User 
> wrote:
>
>> Thanks Daniel.  I guess you were suggesting using DStream/RDD.  Would it
>> be possible to use structured streaming/DataFrames for multi-source
>> streaming?  In addition, we really need each stream data ingestion to be
>> asynchronous or non-blocking...  thanks!
>>
>> On 8/24/21 9:27 PM, daniel williams wrote:
>>
>> Yeah. Build up the streams as a collection and map that query to the
>> start() invocation and map those results to awaitTermination() or whatever
>> other blocking mechanism you’d like to use.
>>
>> On Tue, Aug 24, 2021 at 4:37 PM Artemis User 
>> wrote:
>>
>>> Is there a way to run multiple streams in a single Spark job using
>>> Structured Streaming?  If not, is there an easy way to perform inter-job
>>> communications (e.g. referencing a dataframe among concurrent jobs) in
>>> Spark?  Thanks a lot in advance!
>>>
>>> -- ND
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>> --
>> -dan
>>
>>
>>
>


Re: Processing Multiple Streams in a Single Job

2021-08-24 Thread Sean Owen
No, that applies to the streaming DataFrame API too.
No jobs can't communicate with each other.

On Tue, Aug 24, 2021 at 9:51 PM Artemis User  wrote:

> Thanks Daniel.  I guess you were suggesting using DStream/RDD.  Would it
> be possible to use structured streaming/DataFrames for multi-source
> streaming?  In addition, we really need each stream data ingestion to be
> asynchronous or non-blocking...  thanks!
>
> On 8/24/21 9:27 PM, daniel williams wrote:
>
> Yeah. Build up the streams as a collection and map that query to the
> start() invocation and map those results to awaitTermination() or whatever
> other blocking mechanism you’d like to use.
>
> On Tue, Aug 24, 2021 at 4:37 PM Artemis User 
> wrote:
>
>> Is there a way to run multiple streams in a single Spark job using
>> Structured Streaming?  If not, is there an easy way to perform inter-job
>> communications (e.g. referencing a dataframe among concurrent jobs) in
>> Spark?  Thanks a lot in advance!
>>
>> -- ND
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> -dan
>
>
>


Re: AWS EMR SPARK 3.1.1 date issues

2021-08-23 Thread Sean Owen
Date handling was tightened up in Spark 3. I think you need to compare to a
date literal, not a string literal.

On Mon, Aug 23, 2021 at 5:12 AM Gourav Sengupta <
gourav.sengupta.develo...@gmail.com> wrote:

> Hi,
>
> while I am running in EMR 6.3.0 (SPARK 3.1.1) a simple query as "SELECT *
> FROM  WHERE  > '2021-03-01'" the query is
> failing with error:
> ---
> pyspark.sql.utils.AnalysisException:
> org.apache.hadoop.hive.metastore.api.InvalidObjectException: Unsupported
> expression '2021 - 03 - 01' (Service: AWSGlue; Status Code: 400; Error
> Code: InvalidInputException; Request ID:
> dd3549c2-2eeb-4616-8dc5-5887ba43dd22; Proxy: null)
> ---
>
> The above query works fine in all previous versions of SPARK.
>
> Is this the expected behaviour in SPARK 3.1.1? If so can someone please
> let me know how to write this query.
>
> Also if this is the expected behaviour I think that a lot of users will
> have to make these changes in their existing code making transition to
> SPARK 3.1.1 expensive I think.
>
> Regards,
> Gourav Sengupta
>


Re: How can I read ftp

2021-08-08 Thread Sean Owen
FTP is definitely not supported. Read the files to distributed storage
first then read from there.

On Sun, Aug 8, 2021, 10:18 PM igyu  wrote:

> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/;
>
> val schemas = StructType(List(
> new StructField("name", DataTypes.StringType, true),
> new StructField("age", DataTypes.IntegerType, true),
> new StructField("remk", DataTypes.StringType, true)))
>
>val DF = sparkSession.read.format("csv")
>   .schema(schemas)
>   .option("header","true")
>   .load(ftpUrl)
> //  .filter("created<=1602864000")
>
> DF.printSchema()
> DF.show()
>
> I get error
>
> Exception in thread "main" java.lang.IllegalArgumentException: Illegal
> pattern component: XXX
> at
> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
> at
> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
> at
> org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142)
> at
> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384)
> at
> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369)
> at
> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
> at
> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
> at
> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
> at
> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
> at
> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
> at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
> at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40)
> at com.join.synctool$.main(synctool.scala:41)
> at com.join.synctool.main(synctool.scala)
> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook
>
> --
> igyu
>


Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-08-05 Thread Sean Owen
Doesn't a persist break stages?

On Thu, Aug 5, 2021, 11:40 AM Tom Graves 
wrote:

> As Sean mentioned its only available at Stage level but you said you don't
> want to shuffle so splitting into stages doesn't help you.  Without more
> details it seems like you could "hack" this by just requesting an executor
> with 1 GPU (allowing 2 tasks per gpu) and 2 CPUs and the one task would use
> the GPU and the other could just use the CPU.  Perhaps that is to
> simplistic or brittle though.
>
> Tom
> On Saturday, July 31, 2021, 03:56:18 AM CDT, Andreas Kunft <
> andreas.ku...@gmail.com> wrote:
>
>
> I have a setup with two work intensive tasks, one map using GPU followed
> by a map using only CPU.
>
> Using stage level resource scheduling, I request a GPU node, but would
> also like to execute the consecutive CPU map on a different executor so
> that the GPU node is not blocked.
>
> However, spark will always combine the two maps due to the narrow
> dependency, and thus, I can not define two different resource requirements.
>
> So the question is: can I force the two map functions on different
> executors without shuffling or even better is there a plan to enable this
> by assigning different resource requirements.
>
> Best
>


Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-08-01 Thread Sean Owen
Oh I see, I missed that. You can specify at the stage level, nice. I think
you are more looking to break these operations into two stages. You can do
that with a persist or something - which has a cost but may work fine.

Does it actually help much with GPU utilization - in theory yes but
wondering if these two stages being so bound typically execute at
meaningfully different times. Your use case also seems to entail moving the
work across executors which would have overhead.

Stage is pretty much the lowest level of granularity so no. spark does not
schedule functions, it plans stages. I think this is a question of
splitting things that can be in a stage (usually a very good thing) apart
in this rarer case, not any change in Spark.

Yes DL workloads are important but distributed DL on Spark is already well
handled by third party libs. I'm not sure this is about DL specifically
anyway. Not everything should be in Spark itself.



On Sun, Aug 1, 2021, 11:28 AM Andreas Kunft  wrote:

> Hi,
>
> @Sean: Since Spark 3.x, stage level resource scheduling is available:
> https://databricks.com/session_na21/stage-level-scheduling-improving-big-data-and-ai-integration
>
> @Gourav: I'm using the latest version of Spark 3.1.2. I want to split the
> two maps on different executors, as both the GPU function and the CPU
> function take quite some time,
> so it would be great to have element n being processed in the GPU function
> while n + 1 is already computed in the CPU function. As a workaround, I
> write the results of the
> CPU task to a queue which is consumed by another job that executes the CPU
> task.
>
> Do you have any idea, if resource assignment based scheduling for
> functions is a planned feature for the future?
>
> Best
> Andreas
>
>
> On Sun, Aug 1, 2021 at 6:53 PM Gourav Sengupta 
> wrote:
>
>> Hi Andreas,
>>
>> just to understand the question first, what is it you want to achieve by
>> breaking the map operations across the GPU and CPU?
>>
>> Also it will be wonderful to understand the version of SPARK you are
>> using, and your GPU details a bit more.
>>
>>
>> Regards,
>> Gourav
>>
>> On Sat, Jul 31, 2021 at 9:57 AM Andreas Kunft 
>> wrote:
>>
>>> I have a setup with two work intensive tasks, one map using GPU followed
>>> by a map using only CPU.
>>>
>>> Using stage level resource scheduling, I request a GPU node, but would
>>> also like to execute the consecutive CPU map on a different executor so
>>> that the GPU node is not blocked.
>>>
>>> However, spark will always combine the two maps due to the narrow
>>> dependency, and thus, I can not define two different resource requirements.
>>>
>>> So the question is: can I force the two map functions on different
>>> executors without shuffling or even better is there a plan to enable this
>>> by assigning different resource requirements.
>>>
>>> Best
>>>
>>


Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-07-31 Thread Sean Owen
No, unless I'm crazy you can't even change resource  requirements at the
job level let alone stage. Does it help you though? Is something else even
able to use the GPU otherwise?

On Sat, Jul 31, 2021, 3:56 AM Andreas Kunft  wrote:

> I have a setup with two work intensive tasks, one map using GPU followed
> by a map using only CPU.
>
> Using stage level resource scheduling, I request a GPU node, but would
> also like to execute the consecutive CPU map on a different executor so
> that the GPU node is not blocked.
>
> However, spark will always combine the two maps due to the narrow
> dependency, and thus, I can not define two different resource requirements.
>
> So the question is: can I force the two map functions on different
> executors without shuffling or even better is there a plan to enable this
> by assigning different resource requirements.
>
> Best
>


Re: Cloudera Parcel : spark issues after upgrade 1.6 to 2.4

2021-07-30 Thread Sean Owen
(This is a list of OSS Spark - anything vendor-specific should go to vendor
lists for better answers.)

On Fri, Jul 30, 2021 at 8:35 AM Harsh Sharma 
wrote:

> hi Team ,
>
> we are upgrading our cloudera parcels  to 6.X from 5.x , hence e have
> upgraded version of park from 1.6 to 2.4 . While executing a spark program
> we are getting the below error  :
> Please help us how to resolve in cloudera parcels. There are suggestion to
> install spark gateway roles  if that is the case guide us how to do that .
> Failed to find client configuration. If this host is managed by Cloudera
> Manager, please install Spark Gateway Role on this host to run Spark Jobs.
> Otherwise, please configure the Spark dependencies correctly.
>
> PS : we are not using cloudera manager
>
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Advanced GC Tuning

2021-07-20 Thread Sean Owen
You're right, I think storageFraction is somewhat better to control this,
although some things 'counted' in spark.memory.fraction will also be
long-lived and in the OldGen.
You can also increase the OldGen size if you're pretty sure that's the
issue - 'old' objects in the YoungGen.

I'm not sure how much these will affect performance with modern JVMs; this
advice is 5-9 years old.

On Tue, Jul 20, 2021 at 5:39 PM Kuznetsov, Oleksandr
 wrote:

> Hello,
>
>
>
> I was reading the Garbage Collection Tuning guide here: Tuning - Spark
> 3.1.2 Documentation (apache.org)
> ,
> specifically section on “Advanced GC Tuning”. It is stated that if OldGen
> region is getting full, it is recommended to lower *spark.memory.fraction*.
> I am wondering if this would lower the overall amount of memory available
> for both storage and execution, slowing down execution. Isn’t it better to
> lower *spark.memory.storageFraction* instead?  In this case there is less
> memory available for caching objects, while execution is not being
> affected. Please see below the copy of the passage I am referring to:
>
>
>
> ·   “In the GC stats that are printed, if the OldGen is close to
> being full, reduce the amount of memory used for caching by lowering
> spark.memory.fraction; it is better to cache fewer objects than to slow
> down task execution. Alternatively, consider decreasing the size of the
> Young generation. This means lowering -Xmn if you’ve set it as above. If
> not, try changing the value of the JVM’s NewRatio parameter. Many JVMs
> default this to 2, meaning that the Old generation occupies 2/3 of the
> heap. It should be large enough such that this fraction exceeds
> spark.memory.fraction.”
>
> I would greatly appreciate if you could clarify it for me.
>
>
>


Re: How to specify “positive class” in sparkml classification?

2021-07-07 Thread Sean Owen
The positive class is "1" and negative is "0" by convention; I don't think
you can change that (though you can translate your data if needed).
F1 is defined only in a one-vs-rest sense in multi-class evaluation. You
can set 'metricLabel' to define which class is 'positive' in multiclass -
everything else is 'negative'.

On Wed, Jul 7, 2021 at 7:19 PM Reed Villanueva 
wrote:

> How to specify the "positive class" in sparkml binary classification? (Or
> perhaps: How does a MulticlassClassificationEvaluator
> 
>  determine
> which class is the "positive" one when evaluating for, say, F1 or even just
> Recall?)
> I have a Pipeline like...
>
> pipeline = Pipeline(stages=[label_idxer, feature_idxer, onehotencoder, 
> assembler, my_ml_algo, label_converter])
>
> crossval = CrossValidator(estimator=pipeline,
>   evaluator=MulticlassClassificationEvaluator(
>   labelCol=my_ml_algo.getLabelCol(),
>   predictionCol=my_ml_algo.getPredictionCol(),
>   metricName="f1"),
>   numFolds=3)
>
> Is there a way to specify which label or index is the positive/negative
> class?
>


Re: Increase batch interval in case of delay

2021-07-01 Thread Sean Owen
Wouldn't this happen naturally? the large batches would just take a longer
time to complete already.

On Thu, Jul 1, 2021 at 6:32 AM András Kolbert 
wrote:

> Hi,
>
> I have a spark streaming application which generally able to process the
> data within the given time frame. However, in certain hours it starts
> increasing that causes a delay.
>
> In my scenario, the number of input records are not linearly increase the
> processing time. Hence, ideally I'd like to increase the number of
> batches/records that are being processed after a delay reaches a certain
> time.
>
> Is there a possibility/settings to do so?
>
> Thanks
> Andras
>
>
> [image: image.png]
>


Re: OutOfMemoryError

2021-07-01 Thread Sean Owen
You need to set driver memory before the driver starts, on the CLI or
however you run your app, not in the app itself. By the time the driver
starts to run your app, its heap is already set.

On Thu, Jul 1, 2021 at 12:10 AM javaguy Java  wrote:

> Hi,
>
> I'm getting Java OOM errors even though I'm setting my driver memory to
> 24g and I'm executing against local[*]
>
> I was wondering if anyone can give me any insight.  The server this job is 
> running on has more than enough memory as does the spark driver.
>
> The final result does write 3 csv files that are 300MB each so there's no way 
> its coming close to the 24g
>
> From the OOM, I don't know about the internals of Spark itself to tell me 
> where this is failing + how I should refactor or change anything
>
> Would appreciate any advice on how I can resolve
>
> Thx
>
>
> Parameters here:
>
> val spark = SparkSession
>   .builder
>   .master("local[*]")
>   .appName("OOM")
>   .config("spark.driver.host", "localhost")
>   .config("spark.driver.maxResultSize", "0")
>   .config("spark.sql.caseSensitive", "false")
>   .config("spark.sql.adaptive.enabled", "true")
>   .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
>   .config("spark.driver.memory", "24g")
>   .getOrCreate()
>
>
> My OOM errors are below:
>
> driver): java.lang.OutOfMemoryError: Java heap space
>   at java.io.BufferedOutputStream.(BufferedOutputStream.java:76)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter$ManualCloseBufferedOutputStream$1.(DiskBlockObjectWriter.scala:109)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:110)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
>   at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>   at org.apache.spark.scheduler.Task.run(Task.scala:127)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$Lambda$1792/1058609963.apply(Unknown
>  Source)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>   
>   
>   
>   
> driver): java.lang.OutOfMemoryError: Java heap space
>   at 
> net.jpountz.lz4.LZ4BlockOutputStream.(LZ4BlockOutputStream.java:102)
>   at 
> org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:145)
>   at 
> org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:158)
>   at 
> org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:133)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:122)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
>   at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>   at org.apache.spark.scheduler.Task.run(Task.scala:127)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$Lambda$1792/249605067.apply(Unknown
>  Source)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>
>


Re: Spark Null Pointer Exception

2021-06-30 Thread Sean Owen
The error is in your code, which you don't show. You are almost certainly
incorrectly referencing something like a SparkContext in a Spark task.

On Wed, Jun 30, 2021 at 3:48 PM Amit Sharma  wrote:

> Hi , I am using spark 2.7 version with scala. I am calling a method as
> below
>
> 1. val rddBacklog = spark.sparkContext.parallelize(MAs) // MA is list of say 
> city
>
> 2. rddBacklog.foreach(ma => doAlloc3Daily(ma, fteReview.forecastId, 
> startYear, endYear))
>
> 3.doAlloc3Daily method just doing a database call and doing some scala 
> calculation (no rdd or dataframe)
>
>
> Line number 2 I  am getting below  nullpointer intermittently on cluster but 
> never on local.
>
> java.lang.NullPointerException
>   at 
> sparkStreaming.CalculateFteReview.doAlloc3Daily(CalculateFteReview.scala:1307)
>   at 
> sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
>   at 
> sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> Thanks
>
> Amit
>
>
>
>
>
>


Re: Inclusive terminology usage in Spark

2021-06-30 Thread Sean Owen
This was covered and mostly done last year:
https://issues.apache.org/jira/browse/SPARK-32004
In some instances, it's hard to change the terminology as it would break
user APIs, and the marginal benefit may not be worth it, but, have a look
at the remaining task under that umbrella.

On Wed, Jun 30, 2021 at 5:25 AM Rao, Abhishek (Nokia - IN/Bangalore) <
abhishek@nokia.com> wrote:

> Hi,
>
>
>
> Terms such as Blacklist/Whitelist and master/slave is used at different
> places in Spark Code. Wanted to know if there are any plans to modify this
> to more inclusive terminology, for eg: Denylist/Allowlist and
> Leader/Follower? If so, what is the timeline?
>
> I’ve also created an improvement ticket to track this.
>
> https://issues.apache.org/jira/browse/SPARK-35952
>
>
>
> Thanks and Regards,
>
> Abhishek
>
>
>


Re: CVEs

2021-06-21 Thread Sean Owen
Yeah if it were clearly exploitable right now we'd handle it via private@
instead of JIRA; depends on what you think the importance is. If in doubt
reply to priv...@spark.apache.org

On Mon, Jun 21, 2021 at 6:50 PM Holden Karau  wrote:

> If you get to a point where you find something you think is highly likely
> a valid vulnerability the best path forward is likely reaching out to
> private@ to figure out how to do a security release.
>
> On Mon, Jun 21, 2021 at 4:42 PM Eric Richardson 
> wrote:
>
>> Thanks for the quick reply. Yes, since it is included in the jars then it
>> is unclear whether it is used internally at least to me.
>>
>> I can substitute the jar in the distro to avoid the scanner from finding
>> it but then it is unclear whether I could be breaking something or not.
>> Given that 3.1.2 is the latest release, I guess you might expect that it
>> would pass the scanners but I am not sure if that version spans 3.0.x and
>> 3.1.x or not either.
>>
>> I can report findings in an issue where I am pretty darn sure it is a
>> valid vulnerability if that is ok? That at least would raise the
>> visibility.
>>
>> Will 3.2.x be Scala 2.13.x only or cross compiled with 2.12?
>>
>> I realize Spark is a beast so I just want to help if I can but also not
>> create extra work if it is not useful for me or the Spark team/contributors.
>>
>> On Mon, Jun 21, 2021 at 3:43 PM Sean Owen  wrote:
>>
>>> Whether it matters really depends on whether the CVE affects Spark.
>>> Sometimes it clearly could and so we'd try to back-port dependency updates
>>> to active branches.
>>> Sometimes it clearly doesn't and hey sometimes the dependency is updated
>>> anyway for good measure (mostly to keep this off static analyzer reports)
>>> but probably wouldn't backport.
>>>
>>> Jackson has been a persistent one but in this case Spark is already on
>>> 2.12.x in master, and it wasn't clear last time I looked at those CVEs that
>>> they can affect Spark itself. End user apps perhaps, but those apps can
>>> supply their own Jackson.
>>>
>>> If someone had a legit view that this is potentially more serious I
>>> think we could _probably backport that update, but Jackson can be a little
>>> bit tricky with compatibility IIRC so would just bear some testing.
>>>
>>>
>>> On Mon, Jun 21, 2021 at 5:27 PM Eric Richardson 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am working with Spark 3.1.2 and getting several vulnerabilities
>>>> popping up. I am wondering if the Spark distros are scanned etc. and how
>>>> people resolve these.
>>>>
>>>> For example. I am finding -
>>>> https://nvd.nist.gov/vuln/detail/CVE-2020-25649
>>>>
>>>> This looks like it is fixed in 2.11.0 -
>>>> https://github.com/FasterXML/jackson-databind/issues/2589 - but Spark
>>>> supplies 2.10.0.
>>>>
>>>> Thanks,
>>>> Eric
>>>>
>>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: CVEs

2021-06-21 Thread Sean Owen
You could comment on https://issues.apache.org/jira/browse/SPARK-35550
which covered the updated to Jackson 2.12.3. If there's a decent case for
backporting and it doesn't have major compatibility issues, we can do it.

Then if you have time, try back-porting the patch to branch-3.1 and run
tests. (Or just open the pull request against branch-3.1 and let tests
figure it out). If it passes that's pretty good evidence it's OK.
Or get as far as you can on that and I/we can help backport.

Here were previous comments on compatibility:
https://github.com/apache/spark/pull/32688

3.2 will be Scala 2.12 and possibly experimentally 2.13, but not Scala 2.13
only.



On Mon, Jun 21, 2021 at 6:41 PM Eric Richardson 
wrote:

> Thanks for the quick reply. Yes, since it is included in the jars then it
> is unclear whether it is used internally at least to me.
>
> I can substitute the jar in the distro to avoid the scanner from finding
> it but then it is unclear whether I could be breaking something or not.
> Given that 3.1.2 is the latest release, I guess you might expect that it
> would pass the scanners but I am not sure if that version spans 3.0.x and
> 3.1.x or not either.
>
> I can report findings in an issue where I am pretty darn sure it is a
> valid vulnerability if that is ok? That at least would raise the
> visibility.
>
> Will 3.2.x be Scala 2.13.x only or cross compiled with 2.12?
>
> I realize Spark is a beast so I just want to help if I can but also not
> create extra work if it is not useful for me or the Spark team/contributors.
>
> On Mon, Jun 21, 2021 at 3:43 PM Sean Owen  wrote:
>
>> Whether it matters really depends on whether the CVE affects Spark.
>> Sometimes it clearly could and so we'd try to back-port dependency updates
>> to active branches.
>> Sometimes it clearly doesn't and hey sometimes the dependency is updated
>> anyway for good measure (mostly to keep this off static analyzer reports)
>> but probably wouldn't backport.
>>
>> Jackson has been a persistent one but in this case Spark is already on
>> 2.12.x in master, and it wasn't clear last time I looked at those CVEs that
>> they can affect Spark itself. End user apps perhaps, but those apps can
>> supply their own Jackson.
>>
>> If someone had a legit view that this is potentially more serious I think
>> we could _probably backport that update, but Jackson can be a little bit
>> tricky with compatibility IIRC so would just bear some testing.
>>
>>
>> On Mon, Jun 21, 2021 at 5:27 PM Eric Richardson 
>> wrote:
>>
>>> Hi,
>>>
>>> I am working with Spark 3.1.2 and getting several vulnerabilities
>>> popping up. I am wondering if the Spark distros are scanned etc. and how
>>> people resolve these.
>>>
>>> For example. I am finding -
>>> https://nvd.nist.gov/vuln/detail/CVE-2020-25649
>>>
>>> This looks like it is fixed in 2.11.0 -
>>> https://github.com/FasterXML/jackson-databind/issues/2589 - but Spark
>>> supplies 2.10.0.
>>>
>>> Thanks,
>>> Eric
>>>
>>


Re: CVEs

2021-06-21 Thread Sean Owen
Whether it matters really depends on whether the CVE affects Spark.
Sometimes it clearly could and so we'd try to back-port dependency updates
to active branches.
Sometimes it clearly doesn't and hey sometimes the dependency is updated
anyway for good measure (mostly to keep this off static analyzer reports)
but probably wouldn't backport.

Jackson has been a persistent one but in this case Spark is already on
2.12.x in master, and it wasn't clear last time I looked at those CVEs that
they can affect Spark itself. End user apps perhaps, but those apps can
supply their own Jackson.

If someone had a legit view that this is potentially more serious I think
we could _probably backport that update, but Jackson can be a little bit
tricky with compatibility IIRC so would just bear some testing.


On Mon, Jun 21, 2021 at 5:27 PM Eric Richardson 
wrote:

> Hi,
>
> I am working with Spark 3.1.2 and getting several vulnerabilities popping
> up. I am wondering if the Spark distros are scanned etc. and how people
> resolve these.
>
> For example. I am finding -
> https://nvd.nist.gov/vuln/detail/CVE-2020-25649
>
> This looks like it is fixed in 2.11.0 -
> https://github.com/FasterXML/jackson-databind/issues/2589 - but Spark
> supplies 2.10.0.
>
> Thanks,
> Eric
>


Re: Why does sparkml random forest classifier not support maxBins < number of total categorical values?

2021-06-16 Thread Sean Owen
I think it's because otherwise you would not be able to consider, at least,
K-1 splits among K features, and you want to be able to do that. There may
be more technical reasons in the code that this is strictly enforced, but
it seems like a decent idea. Agree, more than K doesn't seem to help, but,
that won't matter much - you'll still get K-1 possible splits. The value is
global to the whole tree so may need to be higher for other categorical
features, or of course for continuous features.

I don't think this relates to preprocessing. It's a property of the tree.

On Wed, Jun 16, 2021 at 1:33 AM Reed Villanueva 
wrote:

> Why does sparkml's random forest classifier not support maxBins
> 
>  (M)
> < (K) number of total categorical values?
>
> My understanding of decision tree bins is that...
>
> Statistical data binning is basically a form of quantization where you map
>> a set of numbers with continuous values into *smaller*, more manageable
>> “bins.”
>
>
> https://clevertap.com/blog/numerical-vs-categorical-variables-decision-trees/
>
> ...which makes it seem like you wouldn't ever really want to use M > K in
> any case, yet the docs seem to imply that is not the case.
>
> Must be >=2 and >= number of categories for any categorical feature
>
> Plus, when I use the random forest implementation in H2O
> , I
> do have the option of using less bins that the total number of distinct
> categorical values.
>
> Could anyone explain the reason for this restriction in spark? Is there
> some kind of particular data preprocessing / feature engineering users are
> expected to have done beforehand? Am I misunderstanding something about
> decision trees (eg. is it categorical don't really ever *need* to be
> binned in the first place and the setting is just for numerical values or
> something)?
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
Where do you see that ... I see 3 executors busy at first. If that's the
crawl then ?

On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:

> Yeah :)
>
> But it's all running through the same node. So I can run multiple tasks of
> the same type on the same node(the driver), but I can't run multiple tasks
> on multiple nodes.
>
> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>
>> Wait. Isn't that what you were trying to parallelize in the first place?
>>
>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>
>>> Yeah but that something else is the crawl being run, which is triggered
>>> from inside the RDDs, because the log output is slowly outputting crawl
>>> data.
>>>
>>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
That looks like you did some work on the cluster, and now it's stuck doing
something else on the driver - not doing everything on 1 machine.

On Wed, Jun 9, 2021 at 12:43 PM Tom Barber  wrote:

> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>
> Removing the cpu pins gives me more tasks but as you can see here:
>
> https://pasteboard.co/K5Q9GO0.png
>
> It just loads up a single server.
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
persist() doesn't even persist by itself - just sets it to be persisted
when it's executed.
key doesn't matter here, nor partitioning, if this code is trying to run
things on the driver inadvertently.
I don't quite grok what the OSS code you linked to is doing, but it's
running some supplied functions very directly and at a low-level with
sc.runJob, which might be part of how this can do something unusual.
How do you trigger any action? what happens after persist()

On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:

> Thanks Mich,
>
> The key on the first iteration is just a string that says "seed", so it is
> indeed on the first crawl the same across all of the groups. Further
> iterations would be different, but I'm not there yet. I was under the
> impression that a repartition would distribute the tasks. Is that not the
> case?
>
> Thanks
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
> wrote:
>
>> Hi Tom,
>>
>> Persist() here simply means persist to memory). That is all. You can
>> check UI tab on storage
>>
>>
>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>
>> So I gather the code is stuck from your link in the driver. You stated
>> that you tried repartition() but it did not do anything,
>>
>> Further you stated :
>>
>> " The key is pretty static in these tests, so I have also tried forcing
>> the partition count (50 on a 16 core per node cluster) and also
>> repartitioning, but every time all the jobs are scheduled to run on one
>> node."
>>
>>
>> What is the key?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>>
>>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
>>> assume the .persist() at the end of that line doesn't do it?
>>>
>>> I believe, looking at the output in the SparkUI, it gets to
>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>>> and calls the context runJob.
>>>
>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:
>>>
>>>> All these configurations don't matter at all if this is executing on
>>>> the driver.
>>>> Returning an Iterator in flatMap is fine though it 'delays' execution
>>>> until that iterator is evaluated by something, which is normally fine.
>>>> Does creating this FairFetcher do anything by itself? you're just
>>>> returning an iterator that creates them here.
>>>> How do you actually trigger an action here? the code snippet itself
>>>> doesn't trigger anything.
>>>> I think we need more info about what else is happening in the code.
>>>>
>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>>>>
>>>>> Yeah so if I update the FairFetcher to return a seq it makes no real
>>>>> difference.
>>>>>
>>>>> Here's an image of what I'm seeing just for reference:
>>>>> https://pasteboard.co/K5NFrz7.png
>>>>>
>>>>> Because this is databricks I don't have an actual spark submit command
>>>>> but it looks like this:
>>>>>
>>>>> curl  -d
>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>>> "spark.task.cpus":"16"},
>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>>> "-Dpf4j.pluginsDir=/d

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
All these configurations don't matter at all if this is executing on the
driver.
Returning an Iterator in flatMap is fine though it 'delays' execution until
that iterator is evaluated by something, which is normally fine.
Does creating this FairFetcher do anything by itself? you're just returning
an iterator that creates them here.
How do you actually trigger an action here? the code snippet itself doesn't
trigger anything.
I think we need more info about what else is happening in the code.

On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:

> Yeah so if I update the FairFetcher to return a seq it makes no real
> difference.
>
> Here's an image of what I'm seeing just for reference:
> https://pasteboard.co/K5NFrz7.png
>
> Because this is databricks I don't have an actual spark submit command but
> it looks like this:
>
> curl  -d
> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
> "spark.task.cpus":"16"},
> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
> "--executor-memory", "10g",
> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
> "-tn", "5000", "-co",
> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>
> I deliberately pinned spark.task.cpus to 16 to stop it swamping the driver
> trying to run all the tasks in parallel on the one node, but again I've got
> 50 tasks queued up all running on the single node.
>
> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>
>> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
>> Seq just inherits Iterator, right?
>>
>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>>
>>> Interesting Jayesh, thanks, I will test.
>>>
>>> All this code is inherited and it runs, but I don't think its been
>>> tested in a distributed context for about 5 years, but yeah I need to get
>>> this pushed down, so I'm happy to try anything! :)
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>>> wrote:
>>>
>>>> flatMap is supposed to return Seq, not Iterator. You are returning a
>>>> class that implements Iterator. I have a hunch that's what's causing the
>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>>>> FairFetcher.
>>>>
>>>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>>>
>>>> CAUTION: This email originated from outside of the organization. Do
>>>> not click links or open attachments unless you can confirm the sender and
>>>> know the content is safe.
>>>>
>>>>
>>>>
>>>> For anyone interested here's the execution logs up until the point
>>>> where it actually kicks off the workload in question:
>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>>
>>>> On 2021/06/09 01:52:39, Tom Barber  wrote:
>>>> > ExecutorID says driver, and looking at the IP addresses its
>>>> running on its not any of the worker ip's.
>>>> >
>>>> > I forcibly told it to create 50, but they'd all end up running in
>>>> the same place.
>>>> >
>>>> > Working on some other ideas, I set spark.task.cpus to 16 to match
>>>> the nodes whilst still forcing it to 50 partitions
>>>> >
>>>> > val m = 50
>>>> >
>>>> > val 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Really weird. flatMap definitely doesn't happen on the driver. My only
long-shot theory that I haven't thought through is, what is FairFetcher
doing with 'job'? it kind of looks like this is submitting a (driver) Job
directly or something into its scheduler which could be .. something but
maybe that's totally wrong and I'd much more expect that to fail if
executed this way.

On Tue, Jun 8, 2021 at 8:53 PM Tom Barber  wrote:

> ExecutorID says driver, and looking at the IP addresses its running on its
> not any of the worker ip's.
>
> I forcibly told it to create 50, but they'd all end up running in the same
> place.
>
> Working on some other ideas, I set spark.task.cpus to 16 to match the
> nodes whilst still forcing it to 50 partitions
>
> val m = 50
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> that sort of thing. But still the tasks are pinned to the driver executor
> and none of the workers, so I no longer saturate the master node, but I
> also have 3 workers just sat there doing nothing.
>
> On 2021/06/09 01:26:50, Sean Owen  wrote:
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> >
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber 
> wrote:
> >
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
> > > and its getting to this point in the code(
> > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each
> RDD
> > > and returns the results. But when Spark executes it, it runs all the
> crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing
> the
> > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl
> data is
> > > the bottleneck, but that happens after the code has been assigned to a
> node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Are you sure it's on the driver? or just 1 executor?
how many partitions does the groupByKey produce? that would limit your
parallelism no matter what if it's a small number.

On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:

> Hi folks,
>
> Hopefully someone with more Spark experience than me can explain this a
> bit.
>
> I dont' know if this is possible, impossible or just an old design that
> could be better.
>
> I'm running Sparkler as a spark-submit job on a databricks spark cluster
> and its getting to this point in the code(
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> )
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey()
> .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> This basically takes the RDD and then runs a web based crawl over each RDD
> and returns the results. But when Spark executes it, it runs all the crawls
> on the driver node and doesn't distribute them.
>
> The key is pretty static in these tests, so I have also tried forcing the
> partition count (50 on a 16 core per node cluster) and also repartitioning,
> but every time all the jobs are scheduled to run on one node.
>
> What can I do better to distribute the tasks? Because the processing of
> the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> the bottleneck, but that happens after the code has been assigned to a node.
>
> Thanks
>
> Tom
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Problem in Restoring ML Pipeline with UDF

2021-06-08 Thread Sean Owen
It's a little bit of a guess, but the class name
$line103090609224.$read$FeatureModder looks like something generated by the
shell. I think it's your 'real' classname in this case. If you redefined
this later and loaded it you may not find it matches up. Can you declare
this in a package?

On Tue, Jun 8, 2021 at 10:50 AM Artemis User  wrote:

> We have a feature engineering transformer defined as a custom class with
> UDF as follows:
>
> class FeatureModder extends Transformer with DefaultParamsWritable with
> DefaultParamsReadable[FeatureModder] {
> val uid: String = "FeatureModder"+randomUUID
>
> final val inputCol: Param[String] = new Param[String](this,
> "inputCos", "input column")
> final def setInputCol(col:String) = set(inputCol, col)
>
> final val outputCol: Param[String] = new Param[String](this,
> "outputCol", "output column")
> final def setOutputCol(col:String) = set(outputCol, col)
>
> final val size: Param[String] = new Param[String](this, "size",
> "length of output vector")
> final def setSize = (n:Int) => set(size, n.toString)
>
> override def transform(data: Dataset[_]) = {
> val modUDF = udf({n: Int => n % $(size).toInt})
> data.withColumn($(outputCol),
> modUDF(col($(inputCol)).cast(IntegerType)))
> }
>
> def transformSchema(schema: org.apache.spark.sql.types.StructType):
> org.apache.spark.sql.types.StructType = {
> val actualType = schema($(inputCol)).dataType
> require(actualType.equals(IntegerType) ||
> actualType.equals(DoubleType), s"Input column must be of numeric type")
> DataTypes.createStructType(schema.fields :+
> DataTypes.createStructField($(outputCol), IntegerType, false))
> }
>
> override def copy(extra: ParamMap): Transformer = copy(extra)
> }
>
> This was included in an ML pipeline, fitted into a model and persisted to
> a disk file.  When we try to load the pipeline model in a separate notebook
> (we use Zeppelin), an exception is thrown complaining class not fund.
>
> java.lang.ClassNotFoundException: $line103090609224.$read$FeatureModder at
> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at
> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at
> java.base/java.lang.Class.forName0(Native Method) at
> java.base/java.lang.Class.forName(Class.java:398) at
> org.apache.spark.util.Utils$.classForName(Utils.scala:207) at
> org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:630)
> at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356)
> at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160) at
> org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155) at
> org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349)
> at org.apache.spark.ml.util.MLReadable.load(ReadWrite.scala:355) at
> org.apache.spark.ml.util.MLReadable.load$(ReadWrite.scala:355) at
> org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:337) ... 40 elided 
> Could
> someone help explaining why?  My guess was the class definition is not in
> the classpath.  The question is how to include the class definition or
> class metadata as part of the pipeline model serialization? or include the
> class definition in a notebook (we did include the class definition in the
> notebook 

Re: Petastorm vs horovod vs tensorflowonspark vs spark_tensorflow_distributor

2021-06-05 Thread Sean Owen
All of these tools are reasonable choices. I don't think the Spark project
itself has a view on what works best. These things do different things. For
example petastorm is not a training framework, but a way to feed data to a
distributed DL training process on Spark. For what it's worth, Databricks
ships Horovod and Petastorm, but that doesn't mean the other projects are
second-class.

On Tue, Jun 1, 2021 at 4:59 PM Gourav Sengupta <
gourav.sengupta.develo...@gmail.com> wrote:

> Dear TD, Matei, Michael, Reynold,
>
> I hope all of you and your loved ones are staying safe and doing well.
>
> as a member of the community the direction from the SPARK mentors is
> getting to be a bit confusing for me and I was wondering if I can seek your
> help.
>
> We have to make long term decisions which is aligned with the open source
> SPARK compatibility and directions and it will be wonderful to know what is
> the most dependable route to get data from SPARK to tensorflow, is it:
> 1. petastorm
> 2. horovod
> 3. tensorflowonspark
> 4. spark_tensorflow_distributor
> or something else.
>
>
> Any comments from you will be super useful.
>
> If I am not wrong, seamless integration between SPARK to tensorflow/
> pytorch was one of the most exciting visions of SPARK 3.x
>
> While using SPARK ML has its own favourite space, I think that tensorflow
> and pytorch will see a lot of focused development as well.
>
>
> Regards,
> Gourav Sengupta
>


Re: Missing module spark-hadoop-cloud in Maven central

2021-05-31 Thread Sean Owen
I know it's not enabled by default when the binary artifacts are built, but
not exactly sure why it's not built separately at all. It's almost a
dependencies-only pom artifact, but there are two source files. Steve do
you have an angle on that?

On Mon, May 31, 2021 at 5:37 AM Erik Torres  wrote:

> Hi,
>
> I'm following this documentation
>  to
> configure my Spark-based application to interact with Amazon S3. However, I
> cannot find the spark-hadoop-cloud module in Maven central for the
> non-commercial distribution of Apache Spark. From the documentation I would
> expect that I can get this module as a Maven dependency in my project.
> However, I ended up building the spark-hadoop-cloud module from the Spark's
> code .
>
> Is this the expected way to setup the integration with Amazon S3? I think
> I'm missing something here.
>
> Thanks in advance!
>
> Erik
>


Re: [apache spark] Does Spark 2.4.8 have issues with ServletContextHandler

2021-05-27 Thread Sean Owen
Despite the name, the error doesn't mean the class isn't found but could
not be initialized. What's the rest of the error?
I don't believe any testing has ever encountered this error, so it's likely
something to do with your environment, but I don't know what.

On Thu, May 27, 2021 at 7:32 AM Kanchan Kauthale <
kanchankauthal...@gmail.com> wrote:

> Hello,
>
> We have an existing project which works fine with Spark 2.4.7. We want to
> upgrade the spark version to 2.4.8. Scala version we are using is- 2.11
> After building with upgraded pom, we are getting error below for test cases-
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.spark_object.jetty.servlet.ServletContextHandler
> at
> org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:143)
>
> When I checked in Maven dependencies, I could find the
> ServletContextHandlerclass under spark-core_2.11-2.4.8.jar in given package
> hierarchy. I have compiled code, dependencies have been resolved, classpath
> is updated.
>
> Any hint regarding this error would help.
>
>
> Thank you
>
> Kanchan
>


Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Sean Owen
No, the work is happening on the cluster; you just have (say) 100 parallel
jobs running at the same time. You apply spark.read.parquet to each dir --
from the driver yes, but spark.read is distributed. At extremes, yes that
would challenge the driver, to manage 1000s of jobs concurrently. You may
also find that if each job is tiny, there's some overhead in running each
as a distributed operation that may be significant. But it seems like the
simplest thing and will probably work fine.

On Tue, May 25, 2021 at 4:34 PM Eric Beabes 
wrote:

> Right... but the problem is still the same, no? Those N Jobs (aka Futures
> or Threads) will all be running on the Driver. Each with its own
> SparkSession. Isn't that going to put a lot of burden on one Machine? Is
> that really distributing the load across the cluster? Am I missing
> something?
>
> Would it be better to use ECS (Elastic Container Service) for this use
> case which allows us to autoscale?
>
> On Tue, May 25, 2021 at 2:16 PM Sean Owen  wrote:
>
>> What you could do is launch N Spark jobs in parallel from the driver.
>> Each one would process a directory you supply with spark.read.parquet, for
>> example. You would just have 10s or 100s of those jobs running at the same
>> time.  You have to write a bit of async code to do it, but it's pretty easy
>> with Scala Futures.
>>
>> On Tue, May 25, 2021 at 3:31 PM Eric Beabes 
>> wrote:
>>
>>> Here's the use case:
>>>
>>> We've a bunch of directories (over 1000) which contain tons of small
>>> files in each. Each directory is for a different customer so they are
>>> independent in that respect. We need to merge all the small files in each
>>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>>> function.
>>>
>>> Clearly we can do this on the Driver by doing something like:
>>>
>>> list.par.foreach (dir =>compact(spark, dir))
>>>
>>> This works but the problem here is that the parallelism happens on
>>> Driver which won't scale when we've 10,000 customers! At any given time
>>> there will be only as many compactions happening as the number of cores on
>>> the Driver, right?
>>>
>>> We were hoping to do this:
>>>
>>> val df = list.toDF()
>>> df.foreach(dir => compact(spark,dir))
>>>
>>> Our hope was, this will distribute the load amongst Spark Executors &
>>> will scale better.  But this throws the NullPointerException shown in the
>>> original email.
>>>
>>> Is there a better way to do this?
>>>
>>>
>>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
>>>> Why not just read from Spark as normal? Do these files have different
>>>> or incompatible schemas?
>>>>
>>>>
>>>>
>>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>>
>>>>
>>>>
>>>> *From: *Eric Beabes 
>>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>>> *To: *spark-user 
>>>> *Subject: *Reading parquet files in parallel on the cluster
>>>>
>>>>
>>>>
>>>> I've a use case in which I need to read Parquet files in parallel from
>>>> over 1000+ directories. I am doing something like this:
>>>>
>>>>
>>>>
>>>>val df = list.toList.toDF()
>>>>
>>>> df.foreach(c => {
>>>>   val config = *getConfigs()*
>>>> *  doSomething*(spark, config)
>>>> })
>>>>
>>>>
>>>>
>>>> In the doSomething method, when I try to do this:
>>>>
>>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>>
>>>>
>>>>
>>>> I get a NullPointer exception given below. It seems the 'spark.read' only 
>>>> works on the Driver not on the cluster. How can I do what I want to do? 
>>>> Please let me know. Thank you.
>>>>
>>>>
>>>>
>>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>>> java.lang.NullPointerException
>>>>
>>>>
>>>>
>>>> at
>>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>>
>>>>
>>>>
>>>> at
>>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>>
>>>>
>>>>
>>>> at
>>>> org.apache.spark.sql.DataFrameReader.(DataFrameReader.scala:789)
>>>>
>>>>
>>>>
>>>> at
>>>> org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>>
>>>>
>>>>
>>>>


Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Sean Owen
What you could do is launch N Spark jobs in parallel from the driver. Each
one would process a directory you supply with spark.read.parquet, for
example. You would just have 10s or 100s of those jobs running at the same
time.  You have to write a bit of async code to do it, but it's pretty easy
with Scala Futures.

On Tue, May 25, 2021 at 3:31 PM Eric Beabes 
wrote:

> Here's the use case:
>
> We've a bunch of directories (over 1000) which contain tons of small files
> in each. Each directory is for a different customer so they are independent
> in that respect. We need to merge all the small files in each directory
> into one (or a few) compacted file(s) by using a 'coalesce' function.
>
> Clearly we can do this on the Driver by doing something like:
>
> list.par.foreach (dir =>compact(spark, dir))
>
> This works but the problem here is that the parallelism happens on Driver
> which won't scale when we've 10,000 customers! At any given time there will
> be only as many compactions happening as the number of cores on the Driver,
> right?
>
> We were hoping to do this:
>
> val df = list.toDF()
> df.foreach(dir => compact(spark,dir))
>
> Our hope was, this will distribute the load amongst Spark Executors & will
> scale better.  But this throws the NullPointerException shown in the
> original email.
>
> Is there a better way to do this?
>
>
> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Why not just read from Spark as normal? Do these files have different or
>> incompatible schemas?
>>
>>
>>
>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>
>>
>>
>> *From: *Eric Beabes 
>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>> *To: *spark-user 
>> *Subject: *Reading parquet files in parallel on the cluster
>>
>>
>>
>> I've a use case in which I need to read Parquet files in parallel from
>> over 1000+ directories. I am doing something like this:
>>
>>
>>
>>val df = list.toList.toDF()
>>
>> df.foreach(c => {
>>   val config = *getConfigs()*
>> *  doSomething*(spark, config)
>> })
>>
>>
>>
>> In the doSomething method, when I try to do this:
>>
>> val df1 = spark.read.parquet(pathToRead).collect()
>>
>>
>>
>> I get a NullPointer exception given below. It seems the 'spark.read' only 
>> works on the Driver not on the cluster. How can I do what I want to do? 
>> Please let me know. Thank you.
>>
>>
>>
>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9,
>> ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>> java.lang.NullPointerException
>>
>>
>>
>> at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>
>>
>>
>> at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>
>>
>>
>> at
>> org.apache.spark.sql.DataFrameReader.(DataFrameReader.scala:789)
>>
>>
>>
>> at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>
>>
>>
>>


Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Sean Owen
Right, you can't use Spark within Spark.
Do you actually need to read Parquet like this vs spark.read.parquet?
that's also parallel of course.
You'd otherwise be reading the files directly in your function with the
Parquet APIs.

On Tue, May 25, 2021 at 12:24 PM Eric Beabes 
wrote:

> I've a use case in which I need to read Parquet files in parallel from
> over 1000+ directories. I am doing something like this:
>
>val df = list.toList.toDF()
>
> df.foreach(c => {
>   val config = *getConfigs()*
>   doSomething(spark, config)
> })
>
>
> In the doSomething method, when I try to do this:
>
> val df1 = spark.read.parquet(pathToRead).collect()
>
>
> I get a NullPointer exception given below. It seems the 'spark.read' only 
> works on the Driver not on the cluster. How can I do what I want to do? 
> Please let me know. Thank you.
>
>
> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, 
> ip-10-0-5-3.us-west-2.compute.internal, executor 11): 
> java.lang.NullPointerException
>
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>
> at 
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>
> at 
> org.apache.spark.sql.DataFrameReader.(DataFrameReader.scala:789)
>
> at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>
>


Re: unresolved dependency: graphframes#graphframes;0.8.1-spark2.4-s_2.11: not found

2021-05-19 Thread Sean Owen
I think it's because the bintray repo has gone away. Did you see the recent
email about the new repo for these packages?

On Wed, May 19, 2021 at 12:42 PM Wensheng Deng 
wrote:

> Hi experts:
>
> I tried the example as shown on this page, and it is not working for me:
> https://spark-packages.org/package/graphframes/graphframes
>
> Please advise how to proceed. I also tried to unzip the zip file, ran 'sbt
> assembly', and got an error of 'sbt-spark-package;0.2.6: not found'. Is
> there a solution to this?
>
> $ spark-shell --packages graphframes:graphframes:0.8.1-spark2.4-s_2.11
> Ivy Default Cache set to: /Users/wsd57/.ivy2/cache
> The jars for the packages stored in: /Users/wsd57/.ivy2/jars
> :: loading settings :: url =
> jar:file:/Users/wsd57/spark-2.4.0-bin-hadoop2.6/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> graphframes#graphframes added as a dependency
> :: resolving dependencies ::
> org.apache.spark#spark-submit-parent-eb9df503-f763-49b3-83e9-5afef8b0311d;1.0
>confs: [default]
> :: resolution report :: resolve 1429ms :: artifacts dl 0ms
>:: modules in use:
>-
>|  |modules||   artifacts   |
>|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
>-
>|  default |   1   |   0   |   0   |   0   ||   0   |   0   |
>-
>
> :: problems summary ::
>  WARNINGS
> module not found: graphframes#graphframes;0.8.1-spark2.4-s_2.11
>
>  local-m2-cache: tried
>
>
>  
> file:/Users/wsd57/.m2/repository/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.pom
>
>-- artifact
> graphframes#graphframes;0.8.1-spark2.4-s_2.11!graphframes.jar:
>
>
>  
> file:/Users/wsd57/.m2/repository/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.jar
>
>   local-ivy-cache: tried
>
>
>  
> /Users/wsd57/.ivy2/local/graphframes/graphframes/0.8.1-spark2.4-s_2.11/ivys/ivy.xml
>
>  -- artifact graphframes#graphframes;0.8.1-spark2.4-s_2.11!graphframes.jar:
>
>
>  
> /Users/wsd57/.ivy2/local/graphframes/graphframes/0.8.1-spark2.4-s_2.11/jars/graphframes.jar
>
> central: tried
>
>
> https://repo1.maven.org/maven2/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.pom
>
>-- artifact
> graphframes#graphframes;0.8.1-spark2.4-s_2.11!graphframes.jar:
>
>
> https://repo1.maven.org/maven2/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.jar
>
>   spark-packages: tried
>
>
> http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.pom
>
>  -- artifact
> graphframes#graphframes;0.8.1-spark2.4-s_2.11!graphframes.jar:
>
>
> http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.1-spark2.4-s_2.11/graphframes-0.8.1-spark2.4-s_2.11.jar
>
> ::
>
> ::  UNRESOLVED DEPENDENCIES ::
>
>::
>
> :: graphframes#graphframes;0.8.1-spark2.4-s_2.11: not found
>
>   ::
>
>
>
> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
> Exception in thread "main" java.lang.RuntimeException: [unresolved
> dependency: graphframes#graphframes;0.8.1-spark2.4-s_2.11: not found]
>  at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1306)
>  at
> org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
>  at
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:315)
>  at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:143)
>  at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>  at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thank you!
>
> Regards
> W. Deng
>
>


Re: Merge two dataframes

2021-05-17 Thread Sean Owen
Why join here - just add two columns to the DataFrame directly?

On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:

> Anyone have ideas about the below Q?
>
> It seems to me that given that "diamond" DAG, that spark could see
> that the rows haven't been shuffled/filtered, it could do some type of
> "zip join" to push them together, but I've not been able to get a plan
> that doesn't do a hash/sort merge join
>
> Cheers
> Andrew
>
>


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-15 Thread Sean Owen
If code running on the executors need some local file like a config file,
then it does have to be passed this way. That much is normal.

On Sat, May 15, 2021 at 1:41 AM Gourav Sengupta 
wrote:

> Hi,
>
> once again lets start with the requirement. Why are you trying to pass xml
> and json files to SPARK instead of reading them in SPARK?
> Generally when people pass on files they are python or jar files.
>
> Regards,
> Gourav
>


Re: Merge two dataframes

2021-05-12 Thread Sean Owen
Yeah I don't think that's going to work - you aren't guaranteed to get 1,
2, 3, etc. I think row_number() might be what you need to generate a join
ID.

RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You
could .zip two RDDs you get from DataFrames and manually convert the Rows
back to a single Row and back to DataFrame.


On Wed, May 12, 2021 at 10:47 AM kushagra deep 
wrote:

> Thanks Raghvendra
>
> Will the ids for corresponding columns  be same always ? Since
> monotonic_increasing_id() returns a number based on partitionId and the row
> number of the partition  ,will it be same for corresponding columns? Also
> is it guaranteed that the two dataframes will be divided into logical spark
> partitions with the same cardinality for each partition ?
>
> Reg,
> Kushagra Deep
>
> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh 
> wrote:
>
>> You can add an extra id column and perform an inner join.
>>
>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
>>
>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
>>
>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
>>
>> +-+-+
>>
>> |amount_6m|amount_9m|
>>
>> +-+-+
>>
>> |  100|  500|
>>
>> |  200|  600|
>>
>> |  300|  700|
>>
>> |  400|  800|
>>
>> |  500|  900|
>>
>> +-+-+
>>
>>
>> --
>> Raghavendra
>>
>>
>> On Wed, May 12, 2021 at 6:20 PM kushagra deep 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have two dataframes
>>>
>>> df1
>>>
>>> amount_6m
>>>  100
>>>  200
>>>  300
>>>  400
>>>  500
>>>
>>> And a second data df2 below
>>>
>>>  amount_9m
>>>   500
>>>   600
>>>   700
>>>   800
>>>   900
>>>
>>> The number of rows is same in both dataframes.
>>>
>>> Can I merge the two dataframes to achieve below df
>>>
>>> df3
>>>
>>> amount_6m | amount_9m
>>> 100   500
>>>  200  600
>>>  300  700
>>>  400  800
>>>  500  900
>>>
>>> Thanks in advance
>>>
>>> Reg,
>>> Kushagra Deep
>>>
>>>


Re: Installation Error - Please Help!

2021-05-11 Thread Sean Owen
spark-shell is not on your path. Give the full path to it.

On Tue, May 11, 2021 at 4:10 PM Talha Javed  wrote:

> Hello Team!
> Hope you are doing well
>
> I have downloaded the Apache Spark version (spark-3.1.1-bin-hadoop2.7). I
> have downloaded the winutils file too from github.
> Python version :Python 3.9.4
> Java version: java version "1.8.0_291"
> Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
> Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
>
> WHEN I ENTER THE COMMAND spark-shell in cmd it gives me this error
> "'spark-shell' is not recognized as an internal or external command,operable
> program or batch file."
> I am sharing the screenshots of my environment variables. Please help me.
> I am stuck now.
>
> I am looking forward to hearing from you
> Thanks & Regards
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Issue while calling foreach in Pyspark

2021-05-08 Thread Sean Owen
It looks like the executor (JVM) stops immediately. Hard to say why - do
you have Java installed and a compatible version? I agree it could be a
py4j version problem, from that SO link.

On Sat, May 8, 2021, 1:35 PM rajat kumar  wrote:

> Hi Sean/Mich,
>
> Thanks for response.
>
> That was the full log. Sending again for reference. I am just running
> foreach (lamda) which runs pure python code.
>
> Exception in read_logs :  Py4JJavaError Traceback (most recent call last):
>   File "/opt/spark/python/lib/python3.6/site-packages/filename.py", line
> 42, in read_logs
> data_df.rdd.foreach(lambda x: process_logs(x))
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
> foreach
> self.mapPartitions(processPartition).count()  # Force evaluation
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
> count
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in
> sum
> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in
> fold
> vals = self.mapPartitions(func).collect()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
> collect
> sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
> in deco
> return f(*a, **kw)
>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
> 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 3.0 (TID 15, 10.244.42.133, executor 1):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in
> main
> func, profiler, deserializer, serializer = read_command(pickleSer,
> infile)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
> read_command
> command = serializer._read_with_length(file)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
> 172, in _read_with_length
> return self.loads(obj)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
> 580, in loads
> return pickle.loads(obj, encoding=encoding)
>   File "/opt/spark/python/lib/python3.6/site-packages/filename.py", line
> 10, in 
> spark = SparkSession.builder.appName("test").getOrCreate()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
> 173, in getOrCreate
> sc = SparkContext.getOrCreate(sparkConf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367,
> in getOrCreate
> SparkContext(conf=conf or SparkConf())
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133,
> in __init__
> SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316,
> in _ensure_initialized
> SparkContext._gateway = gateway or launch_gateway(conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
> 46, in launch_gateway
> return _launch_gateway(conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
> 108, in _launch_gateway
> raise Exception("Java gateway process exited before sending its port
> number")
> Exception: Java gateway process exited before sending its port number
>


Re: Issue while calling foreach in Pyspark

2021-05-07 Thread Sean Owen
I don't see any reason to think this is related to YARN.
You haven't shown the actual error @rajat so not sure there is anything to
say.

On Fri, May 7, 2021 at 3:08 PM Mich Talebzadeh 
wrote:

> I have suspicion that this may be caused by your cluster as it appears
> that you are running this in YARN mode like below
>
> spark-submit --master yarn --deploy-mode client xyx.py
>
> What happens if you try running it in local mode?
>
> spark-submit --master local[2] xyx.py
>
> Is this run in a managed cluster like GCP dataproc?
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 7 May 2021 at 19:17, rajat kumar 
> wrote:
>
>> Thanks Mich and Sean for the response . Yes Sean is right. This is a
>> batch job.
>>
>>   I am having only 10 records in the dataframe still it is giving this
>> exception
>>
>> Following are the full logs.
>>
>> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line
>> 584, in foreach
>> self.rdd.foreach(f)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
>> foreach
>> self.mapPartitions(processPartition).count()  # Force evaluation
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
>> count
>> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in
>> sum
>> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in
>> fold
>> vals = self.mapPartitions(func).collect()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
>> collect
>> sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 1257, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
>> in deco
>> return f(*a, **kw)
>>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
>> 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 3.0 (TID 10, 10.244.158.5, executor 1):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364,
>> in main
>> func, profiler, deserializer, serializer = read_command(pickleSer,
>> infile)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
>> read_command
>> command = serializer._read_with_length(file)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 172, in _read_with_length
>> return self.loads(obj)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 580, in loads
>> return pickle.loads(obj, encoding=encoding)
>>   File
>> "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py",
>> line 10, in 
>> spark = SparkSession.builder.appName("test").getOrCreate()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
>> 173, in getOrCreate
>> sc = SparkContext.getOrCreate(sparkConf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367,
>> in getOrCreate
>> SparkContext(conf=conf or SparkConf())
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133,
>> in __init__
>> SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316,
>> in

Re: Issue while calling foreach in Pyspark

2021-05-07 Thread Sean Owen
foreach definitely works :)
This is not a streaming question.
The error says that the JVM worker died for some reason. You'd have to look
at its logs to see why.

On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh 
wrote:

> Hi,
>
> I am not convinced foreach works even in 3.1.1
> Try doing the same with foreachBatch
>
>  foreachBatch(sendToSink). \
> trigger(processingTime='2 seconds'). \
>
> and see it works
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 7 May 2021 at 16:07, rajat kumar 
> wrote:
>
>> Hi Team,
>>
>> I am using Spark 2.4.4 with Python
>>
>> While using below line:
>>
>> dataframe.foreach(lambda record : process_logs(record))
>>
>>
>> My use case is , process logs will download the file from cloud storage
>> using Python code and then it will save the processed data.
>>
>> I am getting the following error
>>
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>> 46, in launch_gateway
>> return _launch_gateway(conf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>> 108, in _launch_gateway
>> raise Exception("Java gateway process exited before sending its port
>> number")
>> Exception: Java gateway process exited before sending its port number
>>
>> Can anyone pls suggest what can be done?
>>
>> Thanks
>> Rajat
>>
>


Re: Broadcast Variable

2021-05-03 Thread Sean Owen
There is just one copy in memory. No different than if you have to
variables pointing to the same dict.

On Mon, May 3, 2021 at 7:54 AM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> when broadcasting a large dict containing several million entries to
> executors what exactly happens when calling bc_var.value within a UDF like:
>
>
>
> ..
>
> d = bc_var.value
>
> ..
>
>
>
> Does d receives a copy of the dict inside value or is this handled like a
> pointer?
>
>
>
> Thanks,
> Meikel
>


Re: Cypher language on spark

2021-04-30 Thread Sean Owen
Right, yes it did not continue. It's not in Spark.

On Fri, Apr 30, 2021 at 7:07 AM jonnysettle
 wrote:

> I remeber back in 2019 reading about Cypher language for graph queries been
> introduced to spark 3.X.  But I don't see it in the latest version.  Has
> the
> project been abandoned (issues 25994).   I know there was a the Morpheus
> project but that is not been maintained any more.
>
> Have I missed some thing :-)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Sean Owen
>From tracing the code a bit, it might do this if the POJO class has no
public constructors - does it?

On Thu, Apr 29, 2021 at 9:55 AM Rico Bergmann  wrote:

> Here is the relevant generated code and the Exception stacktrace.
>
> The problem in the generated code is at line 35.
>
>


Re: Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Sean Owen
I don't know this code well, but yes seems like something is looking for
members of a companion object when there is none here. Can you show any
more of the stack trace or generated code?

On Thu, Apr 29, 2021 at 7:40 AM Rico Bergmann  wrote:

> Hi all!
>
> A simplified code snippet of what my Spark pipeline written in Java does:
>
> public class MyPojo implements Serializable {
>
> ... // some fields with Getter and Setter
>
> }
>
>
> a custom Aggregator (defined in the Driver class):
>
> public static MyAggregator extends
> org.apache.spark.sql.expressions.Aggregator { ... }
>
>
> in my Driver I do:
>
> Dataset inputDF = ... //some calculations before
>
> inputDF.groupBy("col1", "col2", "col3").agg(new
> MyAggregator().toColumn().name("aggregated");
>
>
> When executing this part I get a CompileException complaining about an
> unknown variable or type "MyPojo$.MODULE$". For me it looks like the
> CodeGenerator generates code for Scala (since as far as I know .MODULE$
> is a scala specific variable). I tried it with Spark 3.1.1 and Spark 3.0.1.
>
> Does anyone have an idea what's going wrong here?
>
>
> Best,
>
> Rico.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to calculate percentiles in Scala Spark 2.4.x

2021-04-27 Thread Sean Owen
Erm, just
https://spark.apache.org/docs/2.3.0/api/sql/index.html#approx_percentile ?

On Tue, Apr 27, 2021 at 3:52 AM Ivan Petrov  wrote:

> Hi, I have billions, potentially dozens of billions of observations. Each
> observation is a decimal number.
> I need to calculate percentiles 1, 25, 50, 75, 95 for these observations
> using Scala Spark. I can use both RDD and Dataset API. Whatever would work
> better.
>
> What I can do in terms of perf optimisation:
> - I can round decimal observations to long
> - I can even round each observation to nearest 5, for example: 2.6 can be
> rounded to 5 or 11.3123123 can be rounded to 10 to reduce amount of unique
> values of observations (if it helps on Math side)
> - I’m fine with some approximation approach, loose some precision (how to
> measure an error BTW? ) but get percentile results faster.
>
>
> What can I try?
> Thanks!
>


Re: [Spark-Streaming] moving average on categorical data with time windowing

2021-04-26 Thread Sean Owen
You might be able to do this with multiple aggregations on avg(col("col1")
== "cat1") etc, but how about pivoting the DataFrame first so that you get
columns like "cat1" being 1 or 0? you would end up with columns x
categories new columns if you want to count all categories in all cols. But
then it's just a simple aggregation on numeric values.

On Mon, Apr 26, 2021 at 9:29 AM halil  wrote:

> Hello everyone,
>
> I am trying to apply moving average on categorical data like below, which
> is a synthetic data generated by myself.
>
> sqltimestamp,col1,col2,col3,col4,col5
>
> 1618574879,cat1,cat4,cat2,cat5,cat3
>
> 1618574880,cat1,cat3,cat4,cat2,cat5
>
> 1618574881,cat5,cat3,cat4,cat2,cat1
>
> 1618574882,cat2,cat3,cat5,cat1,cat4
>
> 1618574883,cat2,cat4,cat1,cat3,cat5
>
> 1618574884,cat1,cat2,cat5,cat4,cat3
>
> 1618574885,cat5,cat3,cat2,cat1,cat4
>
> 1618574886,cat3,cat5,cat4,cat2,cat1
>
> 1618574887,cat3,cat2,cat5,cat4,cat1
>
> 1618574888,cat1,cat5,cat3,cat2,cat4
>
>
>
>
> I like to take the average of the number of "cat1" in the column "col1"
> for each 5 minutes window according to the column "sqltimestamp". I solved
> when column is numeric but I couldn't solve it when the column is
> categorical as above.
>
>
> The code below produces rows of tuples (timestamp, count) and I cannot
> apply avg aggregate function on the result because spark does not support
> multiple aggregation functions on one streaming.
>
> val movavgDF = spark
>
>   .readStream
>
>   .schema(schema)
>
>   .option("failOnDataLoss", true)
>   .option("delimiter", ",")
>   .csv(inputParameters.csvSinkDir)
>
> .withWatermark("sqltimestamp", "5 seconds")
> .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
> .agg(
> count( when( col("col1") === "cat1", 1)).as("count")
> )
> .withColumn("window_start", col("time_frame")("start").cast(TimestampType
> ))
> .drop("time_frame")
> .orderBy("window_start")
>
>
> After my searches on the net, I have come to the conclusion that we can do it 
> if it is not structural streaming, but I need it while streaming.
>
> I would be very happy if you can provide me a solution for this problem.
>
> Thank you very much in advance.
>
> Best,
>
> -halil.
>
>
>
>
>
>


Re: java.lang.IllegalArgumentException: Unsupported class file major version 55

2021-04-23 Thread Sean Owen
This means you compiled with Java 11, but are running on Java < 11. It's
not related to Spark.

On Fri, Apr 23, 2021 at 10:23 AM chansonzhang 
wrote:

> I just update the spark-* version in my pom.xml to match my spark and scala
> environment, and this solved the problem
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Core][Advanced]: Wrong memory allocation on standalone mode cluster

2021-04-18 Thread Sean Owen
Are you sure about the worker mem configuration? what are you setting
--memory too and what does the worker UI think its memory allocation is?

On Sun, Apr 18, 2021 at 4:08 AM Mohamadreza Rostami <
mohamadrezarosta...@gmail.com> wrote:

> I see a bug in executer memory allocation in the standalone cluster, but I
> can't find which part of the spark code causes this problem. That why's I
> decided to raise this issue here.
> Assume you have 3 workers with 10 CPU cores and 10 Gigabyte memories.
> Assume also you have 2 spark jobs that run on this cluster of workers, and
> these jobs configs set as below:
> -
> job-1:
> executer-memory: 5g
> executer-CPU: 4
> max-cores: 8
> --
> job-2:
> executer-memory: 6g
> executer-CPU: 4
> max-cores: 8
> --
> In this situation, We expect that if we submit both of these jobs, the
> first job that submits get  2 executers which each of them has 4 CPU core
> and 5g memory, and the second job gets only one executer on thirds worker
> who has 4 CPU core and 6g memory because worker 1 and worker 2 doesn't have
> enough memory to accept the second job. But surprisingly, we see that one
> of the first or second workers creates an executor for job-2, and the
> worker's consuming memory goes beyond what's allocated to that and gets 11g
> memory from the operating system.
> Is this behavior normal? I think this can cause some undefined behavior
> problem in the cluster.
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Session error with 30s

2021-04-12 Thread Sean Owen
Something is passing this invalid 30s value, yes. Hard to say which
property it is. I'd check if your cluster config sets anything with the
value 30s - whatever is reading this property is not expecting it.

On Mon, Apr 12, 2021, 2:25 PM KhajaAsmath Mohammed 
wrote:

> Hi Sean,
>
> Do you think anything that can cause this with DFS client?
>
> java.lang.NumberFormatException: For input string: "30s"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:589)
> at java.lang.Long.parseLong(Long.java:631)
>
>
>
> * at
> org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
>   at
> org.apache.hadoop.hdfs.client.impl.DfsClientConf.(DfsClientConf.java:247)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:301)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:285)*
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184)
> at
> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
> at
> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.deploy.yarn.Client.(Client.scala:137)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
> at org.apache.spark.SparkContext.(SparkContext.scala:501)
> at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession
>
> Thanks,
> Asmath
>
> On Mon, Apr 12, 2021 at 2:20 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> I am using spark hbase connector provided by hortonwokrs. I was able to
>> run without issues in my local environment and has this issue in emr.
>>
>> Thanks,
>> Asmath
>>
>> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
>>
>> 
>> Somewhere you're passing a property that expects a number, but give it
>> "30s". Is it a time property somewhere that really just wants MS or
>> something? But most time properties (all?) in Spark should accept that type
>> of input anyway. Really depends on what property has a problem and what is
>> setting it.
>>
>> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> HI,
>>>
>>> I am getting weird error when running spark job in emr cluster. Same
>>> program runs in my local machine. Is there anything that I need to do to
>>> resolve this?
>>>
>>> 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
>>> java.lang.NumberFormatException: For input string: "30s"
>>>
>>> I tried the solution mentioned in the link below but it didn't work for
>>> me.
>>>
>>>
>>> https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/
>>>
>>> Thanks,
>>> Asmath
>>>
>>


Re: Spark Session error with 30s

2021-04-12 Thread Sean Owen
Somewhere you're passing a property that expects a number, but give it
"30s". Is it a time property somewhere that really just wants MS or
something? But most time properties (all?) in Spark should accept that type
of input anyway. Really depends on what property has a problem and what is
setting it.

On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> HI,
>
> I am getting weird error when running spark job in emr cluster. Same
> program runs in my local machine. Is there anything that I need to do to
> resolve this?
>
> 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
> java.lang.NumberFormatException: For input string: "30s"
>
> I tried the solution mentioned in the link below but it didn't work for me.
>
>
> https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/
>
> Thanks,
> Asmath
>


Re: Spark and Bintray's shutdown

2021-04-12 Thread Sean Owen
Spark itself is distributed via Maven Central primarily, so I don't think
it will be affected?

On Mon, Apr 12, 2021 at 11:22 AM Florian CASTELAIN <
florian.castel...@redlab.io> wrote:

> Hello.
>
>
>
> Bintray will shutdown on first May.
>
>
>
> I just saw that packages are hosted on Bintray (which is actually down for
> maintenance).
>
>
>
> What will happen after first May ? Is there any maintenance to do in
> projects to still be able to download spark dependencies ?
>
>
>
> Regards !
>
>
>
> *[image: signature_299490615]* 
>
>
>
> [image: Banner] 
>
>
>
> *Florian CASTELAIN *
> *Ingénieur logiciel*
>
> 72 Rue de la République, 76140 Le Petit-Quevilly
> m: +33 616 530 226
> e: florian.castel...@redlab.io w: www.redlab.io
>
>
>
>
>


Re: GPU job in Spark 3

2021-04-09 Thread Sean Owen
(I apologize, I totally missed that this should use GPUs because of RAPIDS.
Ignore my previous. But yeah it's more a RAPIDS question.)

On Fri, Apr 9, 2021 at 12:09 PM HaoZ  wrote:

> Hi Martin,
>
> I tested the local mode in Spark on Rapids Accelerator and it works fine
> for
> me.
> The only possible issue is the CUDA 11.2 however the supported CUDA version
> as per https://nvidia.github.io/spark-rapids/docs/download.html is 11.0.
>
> Here is a quick test using Spark local mode.
> Note: When I was testing this local mode, I make sure there is nothing in
> spark-defaults.conf so everything is clean.
>
> ==
> scala> val df = sc.makeRDD(1 to 100, 6).toDF
> df: org.apache.spark.sql.DataFrame = [value: int]
>
> scala> val df2 = sc.makeRDD(1 to 100, 6).toDF
> df2: org.apache.spark.sql.DataFrame = [value: int]
>
> scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a"
> === $"b").count
> res0: Long = 100
> scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a"
> === $"b").explain()
> == Physical Plan ==
> GpuColumnarToRow false
> +- GpuShuffledHashJoin [a#29], [b#31], Inner, GpuBuildRight, false
>:- GpuShuffleCoalesce 2147483647
>:  +- GpuColumnarExchange gpuhashpartitioning(a#29, 10),
> ENSURE_REQUIREMENTS, [id=#221]
>: +- GpuProject [value#2 AS a#29]
>:+- GpuRowToColumnar TargetSize(2147483647)
>:   +- *(1) SerializeFromObject [input[0, int, false] AS
> value#2]
>:  +- Scan[obj#1]
>+- GpuCoalesceBatches RequireSingleBatch
>   +- GpuShuffleCoalesce 2147483647
>  +- GpuColumnarExchange gpuhashpartitioning(b#31, 10),
> ENSURE_REQUIREMENTS, [id=#228]
> +- GpuProject [value#8 AS b#31]
>+- GpuRowToColumnar TargetSize(2147483647)
>   +- *(2) SerializeFromObject [input[0, int, false] AS
> value#8]
>  +- Scan[obj#7]
> ==
>
> Thanks,
> Hao
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread Sean Owen
Actually, good question, I'm not sure. I don't think that Spark would
vectorize these operations over rows.
Whereas in a pandas UDF, given a DataFrame, you can apply operations like
sin to 1000s of values at once in native code via numpy. It's trivially
'vectorizable' and I've seen good wins over, at least, a single-row UDF.

On Fri, Apr 9, 2021 at 9:14 AM ayan guha  wrote:

> Hi Sean - absolutely open to suggestions.
>
> My impression was using spark native functions should provide similar perf
> as scala ones because serialization penalty should not be there, unlike
> native python udfs.
>
> Is it wrong understanding?
>
>
>
> On Fri, 9 Apr 2021 at 10:55 pm, Rao Bandaru  wrote:
>
>> Hi All,
>>
>>
>> yes ,i need to add the below scenario based code to the executing spark
>> job,while executing this it took lot of time to complete,please suggest
>> best way to get below requirement without using UDF
>>
>>
>> Thanks,
>>
>> Ankamma Rao B
>> --
>> *From:* Sean Owen 
>> *Sent:* Friday, April 9, 2021 6:11 PM
>> *To:* ayan guha 
>> *Cc:* Rao Bandaru ; User 
>> *Subject:* Re: [Spark SQL]:to calculate distance between four
>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk
>> dataframe
>>
>> This can be significantly faster with a pandas UDF, note, because you can
>> vectorize the operations.
>>
>> On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:
>>
>> Hi
>>
>> We are using a haversine distance function for this, and wrapping it in
>> udf.
>>
>> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
>> from pyspark.sql.types import *
>>
>> def haversine_distance(long_x, lat_x, long_y, lat_y):
>> return acos(
>> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
>> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
>> cos(toRadians(long_x) - toRadians(long_y))
>> ) * lit(6371.0)
>>
>> distudf = udf(haversine_distance, FloatType())
>>
>> in case you just want to use just Spark SQL, you can still utilize the
>> functions shown above to implement in SQL.
>>
>> Any reason you do not want to use UDF?
>>
>> Credit
>> <https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>
>>
>>
>> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:
>>
>> Hi All,
>>
>>
>>
>> I have a requirement to calculate distance between four
>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
>> dataframe *with the help of from *geopy* import *distance *without using
>> *UDF* (user defined function)*,*Please help how to achieve this scenario
>> and do the needful.
>>
>>
>>
>> Thanks,
>>
>> Ankamma Rao B
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: GPU job in Spark 3

2021-04-09 Thread Sean Owen
I don't see anything in this job that would use a GPU?

On Fri, Apr 9, 2021 at 11:19 AM Martin Somers  wrote:

>
> Hi Everyone !!
>
> Im trying to get on premise GPU instance of Spark 3 running on my ubuntu
> box, and I am following:
>
> https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-on-prem.html#example-join-operation
>
> Anyone with any insight into why a spark job isnt being ran on the GPU -
> appears to be all on the CPU, hadoop binary installed and appears to be
> functioning fine
>
> export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
>
> here is my setup on ubuntu20.10
>
>
> ▶ nvidia-smi
>
>
> +-+
> | NVIDIA-SMI 460.39   Driver Version: 460.39   CUDA Version: 11.2
> |
>
> |---+--+--+
> | GPU  NamePersistence-M| Bus-IdDisp.A | Volatile Uncorr.
> ECC |
> | Fan  Temp  Perf  Pwr:Usage/Cap| Memory-Usage | GPU-Util  Compute
> M. |
> |   |  |   MIG
> M. |
>
> |===+==+==|
> |   0  GeForce RTX 3090Off  | :21:00.0  On |
>  N/A |
> |  0%   38CP819W / 370W |478MiB / 24265MiB |  0%
>  Default |
> |   |  |
>  N/A |
>
> +---+--+--+
>
> /opt/sparkRapidsPlugin
>
>
> ▶ ls
> cudf-0.18.1-cuda11.jar  getGpusResources.sh  rapids-4-spark_2.12-0.4.1.jar
>
> ▶ scalac --version
> Scala compiler version 2.13.0 -- Copyright 2002-2019, LAMP/EPFL and
> Lightbend, Inc.
>
>
> ▶ spark-shell --version
> 2021-04-09 17:05:36,158 WARN util.Utils: Your hostname, studio resolves to
> a loopback address: 127.0.1.1; using 192.168.0.221 instead (on interface
> wlp71s0)
> 2021-04-09 17:05:36,159 WARN util.Utils: Set SPARK_LOCAL_IP if you need to
> bind to another address
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.1.1
>   /_/
>
> Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
> Branch HEAD
> Compiled by user ubuntu on 2021-02-22T01:04:02Z
> Revision 1d550c4e90275ab418b9161925049239227f3dc9
> Url https://github.com/apache/spark
> Type --help for more information.
>
>
> here is how I calling spark prior to adding the test job
>
> $SPARK_HOME/bin/spark-shell \
>--master local \
>--num-executors 1 \
>--conf spark.executor.cores=16 \
>--conf spark.rapids.sql.concurrentGpuTasks=1 \
>--driver-memory 10g \
>--conf
> spark.executor.extraClassPath=${SPARK_CUDF_JAR}:${SPARK_RAPIDS_PLUGIN_JAR}
>
>--conf spark.rapids.memory.pinnedPool.size=16G \
>--conf spark.locality.wait=0s \
>--conf spark.sql.files.maxPartitionBytes=512m \
>--conf spark.sql.shuffle.partitions=10 \
>--conf spark.plugins=com.nvidia.spark.SQLPlugin \
>--files $SPARK_RAPIDS_DIR/getGpusResources.sh \
>--jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR}
>
>
> Test job is from the example join-operation
>
> val df = sc.makeRDD(1 to 1000, 6).toDF
> val df2 = sc.makeRDD(1 to 1000, 6).toDF
> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" ===
> $"b").count
>
>
> I just noticed that the scala versions are out of sync - that shouldnt
> affect it?
>
>
> is there anything else I can try in the --conf or is there any logs to see
> what might be failing behind the scenes, any suggestions?
>
>
> Thanks
> Martin
>
>
> --
> M
>


Re: possible bug

2021-04-09 Thread Sean Owen
OK so it's '7 threads overwhelming off heap mem in the JVM' kind of
thing. Or running afoul of ulimits in the OS.

On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi Sean!
>
> So the "coalesce" without shuffle will create a CoalescedRDD which during
> its computation delegates to the parent RDD partitions.
> As the CoalescedRDD contains only 1 partition so we talk about 1 task and
> 1 task context.
>
> The next stop is PythonRunner.
>
> Here the python workers at least are reused (when
> "spark.python.worker.reuse" is true, and true is the default) but the
> MonitorThreads are not reused and what is worse all the MonitorThreads are
> created for the same worker and same TaskContext.
> This means the CoalescedRDD's 1 tasks should be completed to stop the
> first monitor thread, relevant code:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570
>
> So this will lead to creating 7 extra threads when 1 would be enough.
>
> The jira is: https://issues.apache.org/jira/browse/SPARK-35009
> The PR will next week maybe (I am a bit uncertain as I have many other
> things to do right now).
>
> Best Regards,
> Attila
>
>>
>>>


Re: possible bug

2021-04-09 Thread Sean Owen
Yeah I figured it's not something fundamental to the task or Spark. The
error is very odd, never seen that. Do you have a theory on what's going on
there? I don't!

On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi!
>
> I looked into the code and find a way to improve it.
>
> With the improvement your test runs just fine:
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
>   /_/
>
> Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
> Spark context Web UI available at http://192.168.0.199:4040
> Spark context available as 'sc' (master = local, app id =
> local-1617982367872).
> SparkSession available as 'spark'.
>
> In [1]: import pyspark
>
> In [2]:
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>
> In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
>
> In [4]: rows=7
>
> In [5]: data=list(range(rows))
>
> In [6]: rdd=sc.parallelize(data,rows)
>
> In [7]: assert rdd.getNumPartitions()==rows
>
> In [8]: rdd0=rdd.filter(lambda x:False)
>
> In [9]: assert rdd0.getNumPartitions()==rows
>
> In [10]: rdd00=rdd0.coalesce(1)
>
> In [11]: data=rdd00.collect()
> 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
> large siz
> e (4729 KiB). The maximum recommended task size is 1000 KiB.
>
> In [12]: assert data==[]
>
> In [13]:
>
>
> I will create a jira and need to add some unittest before opening the PR.
>
> Best Regards,
> Attila
>
>>


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread Sean Owen
This can be significantly faster with a pandas UDF, note, because you can
vectorize the operations.

On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:

> Hi
>
> We are using a haversine distance function for this, and wrapping it in
> udf.
>
> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
> from pyspark.sql.types import *
>
> def haversine_distance(long_x, lat_x, long_y, lat_y):
> return acos(
> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
> cos(toRadians(long_x) - toRadians(long_y))
> ) * lit(6371.0)
>
> distudf = udf(haversine_distance, FloatType())
>
> in case you just want to use just Spark SQL, you can still utilize the
> functions shown above to implement in SQL.
>
> Any reason you do not want to use UDF?
>
> Credit
> 
>
>
> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:
>
>> Hi All,
>>
>>
>>
>> I have a requirement to calculate distance between four
>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
>> dataframe *with the help of from *geopy* import *distance *without using
>> *UDF* (user defined function)*,*Please help how to achieve this scenario
>> and do the needful.
>>
>>
>>
>> Thanks,
>>
>> Ankamma Rao B
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Why is Spark 3.0.x faster than Spark 3.1.x

2021-04-08 Thread Sean Owen
Right, you already established a few times that the difference is the
number of partitions. Russell answered with what is almost surely the
correct answer, that it's AQE. In toy cases it isn't always a win.
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up
more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar  wrote:

> So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For
> pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): 
> Finished
> in 10 seconds For pyspark==3.1.1 (same stage "showString at
> NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds
> As you can see everything is literally the same between 3.0.2 and 3.1.1,
> number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle
> Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes
> 10/12 and the other 2 are the processing of the actual task which I shared
> previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks
> with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6
> (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference,
> over 20 seconds which when it comes to the whole process being within a
> minute that is a big bump. Not sure what it is, but until further notice, I
> will advise our users to not use Spark/PySpark 3.1.1 locally or in
> Databricks. (there are other optimizations, maybe it's not noticeable, but
> this is such a simple code and it can become a bottleneck quickly in larger
> pipelines)
> --
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: possible bug

2021-04-08 Thread Sean Owen
That's a very low level error from the JVM. Any chance you are
misconfiguring the executor size? like to 10MB instead of 10GB, that kind
of thing. Trying to think of why the JVM would have very little memory to
operate.
An app running out of mem would not look like this.

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <
markus.wei...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64
> cores and 128 GB RAM). I'm using spark 3.01.
>
>
>
> The following python code leads to an exception, is this a bug or is my
> understanding of the API incorrect?
>
>
>
> import pyspark
>
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>
> sc=pyspark.SparkContext.getOrCreate(conf)
>
> rows=7
>
> data=list(range(rows))
>
> rdd=sc.parallelize(data,rows)
>
> assert rdd.getNumPartitions()==rows
>
> rdd0=rdd.filter(lambda x:False)
>
> assert rdd0.getNumPartitions()==rows
>
> rdd00=rdd0.coalesce(1)
>
> data=rdd00.collect()
>
> assert data==[]
>
>
>
> output when starting from PyCharm:
>
>
>
> /home/ubuntu/PycharmProjects//venv/bin/python
> /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
> --mode=client --port=41185
>
> import sys; print('Python %s on %s' % (sys.version, sys.platform))
>
> sys.path.extend(['/home/ubuntu/PycharmProjects/'])
>
> PyDev console: starting.
>
> Python 3.8.5 (default, Jan 27 2021, 15:41:15)
>
> [GCC 9.3.0] on linux
>
> import os
>
> os.environ['PYTHONHASHSEED'] = '0'
>
> runfile('/home/ubuntu/PycharmProjects//tests/test.py',
> wdir='/home/ubuntu/PycharmProjects//tests')
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
>
> WARNING: All illegal access operations will be denied in a future release
>
> 21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> 21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very
> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>
> [Stage 0:>  (0 +
> 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages
> failed (0x7f43d23ff000-0x7f43d2403000).
>
> [423.190s][warning][os,thread] Attempt to deallocate stack guard pages
> failed.
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f43d300b000, 16384, 0) failed; error='Not enough
> space' (errno=12)
>
> [423.231s][warning][os,thread] Failed to start thread - pthread_create
> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>
> #
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
>
> # Native memory allocation (mmap) failed to map 16384 bytes for committing
> reserved memory.
>
> # An error report file with more information is saved as:
>
> # /home/ubuntu/PycharmProjects//tests/hs_err_pid17755.log
>
> [thread 17966 also had an error]
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f4b7bd81000, 262144, 0) failed; error='Not enough
> space' (errno=12)
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1207, in send_command
>
> raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1033, in send_command
>
> response = connection.send_command(command)
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1211, in send_command
>
> raise Py4JNetworkError(
>
> py4j.protocol.Py4JNetworkError: Error while receiving
>
> ERROR:py4j.java_gateway:An error occurred while trying to connect to the
> Java server (127.0.0.1:42439)
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>
> sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
>
> return_value = get_return_value(
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
> 334, in get_return_value
>
> raise Py4JError(
>
> py4j.protocol.Py4JError: An error occurred while calling
> 

Re: Apache ML Agorithm Solution

2021-04-07 Thread Sean Owen
I think this question was asked just a week ago? same company and setup.

https://mail-archives.apache.org/mod_mbox/spark-user/202104.mbox/%3CLNXP123MB2604758548BE38E8D3F369EC8A7B9%40LNXP123MB2604.GBRP123.PROD.OUTLOOK.COM%3E

On Wed, Apr 7, 2021 at 11:17 AM SRITHALAM, ANUPAMA (Risk Value Stream)
 wrote:

> Classification: Limited
>
> Hi Team,
>
>
>
> We are trying to use Gradient Boosting Classification algorithm and in
> Python we tried using Sklearn library and in Pyspark we are using ML
> library.
>
>
>
> We have around 45k dataset which is used for training and that dataset is
> taking around 3 to 4 hours in python but in Pyspark it is taking more than
> 18 hours for the same hyper parameters used between Python and Pyspark.
>
>
>
> We tried Pyspark by repartitioning the dataframe and can see a little
> improvement in performance but still we are not able to get timings near to
> Python.
>
>
>
> We have live run which need to evaluation predictions for 40million plus
> data and data resides in Hadoop. So it is difficult to get that huge amount
> to data to different system and convert to Pandas dataframe and run against
> Python.
>
>
>
> So we are trying to train the same model against Pyspark so, that I can do
> the evaluation against trained model in Pyspark but, here the concern that
> we have is the time taken for training is very high and we want to check
> what will be the general approach followed in these kind of scenarios.
>
>
>
>
>
> Thanks,
>
> Anupama.
>
> Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC95000. Telephone: 0131 225 4555.
>
> Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN.
> Registered in England and Wales no. 2065. Telephone 0207626 1500.
>
> Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC327000. Telephone: 03457 801 801.
>
> Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street,
> London EC2V 7HN. Registered in England and Wales no. 10399850.
>
> Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25
> Gresham Street, London EC2V 7HN. Registered in England and Wales no.
> 11722983.
>
> Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets
> plc are authorised by the Prudential Regulation Authority and regulated by
> the Financial Conduct Authority and Prudential Regulation Authority.
>
> Scottish Widows Schroder Personal Wealth Limited is authorised and
> regulated by the Financial Conduct Authority.
>
> Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned
> subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate
> Markets Wertpapierhandelsbank GmbH has its registered office at
> Thurn-und-Taxis Platz 6, 60313 Frankfurt, Germany. The company is
> registered with the Amtsgericht Frankfurt am Main, HRB 111650. Lloyds Bank
> Corporate Markets Wertpapierhandelsbank GmbH is supervised by the
> Bundesanstalt für Finanzdienstleistungsaufsicht.
>
> Halifax is a division of Bank of Scotland plc.
>
> HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in
> Scotland no. SC218813.
>
> This e-mail (including any attachments) is private and confidential and
> may contain privileged material. If you have received this e-mail in error,
> please notify the sender and delete it (including any attachments)
> immediately. You must not copy, distribute, disclose or use any of the
> information in it or any attachments. Telephone calls may be monitored or
> recorded.
>


Re: jar incompatibility with Spark 3.1.1 for structured streaming with kafka

2021-04-07 Thread Sean Owen
You shouldn't be modifying your cluster install. You may at this point have
conflicting, excess JARs in there somewhere. I'd start it over if you can.

On Wed, Apr 7, 2021 at 7:15 AM Gabor Somogyi 
wrote:

> Not sure what you mean not working. You've added 3.1.1 to packages which
> uses:
> * 2.6.0 kafka-clients:
> https://github.com/apache/spark/blob/1d550c4e90275ab418b9161925049239227f3dc9/pom.xml#L136
> * 2.6.2 commons pool:
> https://github.com/apache/spark/blob/1d550c4e90275ab418b9161925049239227f3dc9/pom.xml#L183
>
> I think it worth an end-to-end dep-tree analysis what is really happening
> on the cluster...
>
> G
>
>
> On Wed, Apr 7, 2021 at 11:11 AM Mich Talebzadeh 
> wrote:
>
>> Hi Gabor et. al.,
>>
>> To be honest I am not convinced this package --packages
>> org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 is really working!
>>
>> I know for definite that spark-sql-kafka-0-10_2.12-3.1.0.jar works fine.
>> I reported the package working before because under $SPARK_HOME/jars on all
>> nodes there was a copy 3.0.1 jar file. Also in $SPARK_HOME/conf we had the
>> following entries:
>>
>> spark.yarn.archive=hdfs://rhes75:9000/jars/spark-libs.jar
>> spark.driver.extraClassPath$SPARK_HOME/jars/*.jar
>> spark.executor.extraClassPath  $SPARK_HOME/jars/*.jar
>>
>> So the jar file was picked up first anyway.
>>
>> The concern I have is that that the package uses older version of jar
>> files, namely: the following in .ivy2/jars
>>
>> -rw-r--r-- 1 hduser hadoop 6407352 Dec 19 13:14
>> com.github.luben_zstd-jni-1.4.8-1.jar
>> -rw-r--r-- 1 hduser hadoop  129174 Apr  6  2019
>> org.apache.commons_commons-pool2-2.6.2.jar
>> -rw-r--r-- 1 hduser hadoop 3754508 Jul 28  2020
>> org.apache.kafka_kafka-clients-2.6.0.jar
>> -rw-r--r-- 1 hduser hadoop  387494 Feb 22 03:57
>> org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.1.jar
>> -rw-r--r-- 1 hduser hadoop   55766 Feb 22 03:58
>> org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.1.jar
>> -rw-r--r-- 1 hduser hadoop  649950 Jan 18  2020 org.lz4_lz4-java-1.7.1.jar
>> -rw-r--r-- 1 hduser hadoop   41472 Dec 16  2019
>> org.slf4j_slf4j-api-1.7.30.jar
>> -rw-r--r-- 1 hduser hadoop2777 Oct 22  2014
>> org.spark-project.spark_unused-1.0.0.jar
>> -rw-r--r-- 1 hduser hadoop 1969177 Nov 28 18:10
>> org.xerial.snappy_snappy-java-1.1.8.2.jar
>>
>>
>> So I am not sure. Hence I want someone to verify this independently in
>> anger
>>
>>


Mesos + Spark users going forward?

2021-04-07 Thread Sean Owen
I noted that Apache Mesos is moving to the attic, so won't be actively
developed soon:
https://lists.apache.org/thread.html/rab2a820507f7c846e54a847398ab20f47698ec5bce0c8e182bfe51ba%40%3Cdev.mesos.apache.org%3E

That doesn't mean people will stop using it as a Spark resource manager
soon. But it suggests the Spark + Mesos integration is a candidate for
deprecation and eventual removal in Spark at some point.

This is mostly an informal poll: are there Mesos users out there planning
to continue using it for a while? moving off? just seeing if it's
reasonable to even start deprecation in 3.2.0.

Sean


Re: jar incompatibility with Spark 3.1.1 for structured streaming with kafka

2021-04-06 Thread Sean Owen
 _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 3.1.1
>>>   /_/
>>>
>>> Using Scala version 2.12.9, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_201
>>> Branch HEAD
>>> Compiled by user ubuntu on 2021-02-22T01:33:19Z
>>>
>>>
>>> spark-submit --master yarn --deploy-mode client --conf
>>> spark.pyspark.virtualenv.enabled=true --conf
>>> spark.pyspark.virtualenv.type=native --conf
>>> spark.pyspark.virtualenv.requirements=/home/hduser/dba/bin/python/requirements.txt
>>> --conf
>>> spark.pyspark.virtualenv.bin.path=/usr/src/Python-3.7.3/airflow_virtualenv
>>> --conf
>>> spark.pyspark.python=/usr/src/Python-3.7.3/airflow_virtualenv/bin/python3
>>> --driver-memory 16G --executor-memory 8G --num-executors 4 --executor-cores
>>> 2 xyz.py
>>>
>>> enabling with virtual environment
>>>
>>>
>>> That works fine with any job that does not do structured streaming in a
>>> client mode.
>>>
>>>
>>> Running on local  node with
>>>
>>>
>>> spark-submit --master local[4] --conf
>>> spark.pyspark.virtualenv.enabled=true --conf
>>> spark.pyspark.virtualenv.type=native --conf
>>> spark.pyspark.virtualenv.requirements=/home/hduser/dba/bin/python/requirements.txt
>>> --conf
>>> spark.pyspark.virtualenv.bin.path=/usr/src/Python-3.7.3/airflow_virtualenv
>>> --conf
>>> spark.pyspark.python=/usr/src/Python-3.7.3/airflow_virtualenv/bin/python3
>>> xyz.py
>>>
>>>
>>> works fine with the same spark version and $SPARK_HOME/jars
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 6 Apr 2021 at 13:20, Sean Owen  wrote:
>>>
>>>> You may be compiling your app against 3.0.1 JARs but submitting to
>>>> 3.1.1.
>>>> You do not in general modify the Spark libs. You need to package libs
>>>> like this with your app at the correct version.
>>>>
>>>> On Tue, Apr 6, 2021 at 6:42 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Thanks Gabor.
>>>>>
>>>>> All nodes are running Spark /spark-3.1.1-bin-hadoop3.2
>>>>>
>>>>> So $SPARK_HOME/jars contains all the required jars on all nodes
>>>>> including the jar file commons-pool2-2.9.0.jar as well.
>>>>>
>>>>> They are installed identically on all nodes.
>>>>>
>>>>> I have looked at the Spark environment for classpath. Still I don't
>>>>> see the reason why Spark 3.1.1 fails with spark-sql-kafka-0-10_2.
>>>>> 12-3.1.1.jar
>>>>> but works ok with  spark-sql-kafka-0-10_2.12-3.1.0.jar
>>>>>
>>>>> Anyway I unzipped the tarball for Spark-3.1.1 and there is
>>>>> no spark-sql-kafka-0-10_2.12-3.0.1.jar even
>>>>>
>>>>> I had to add spark-sql-kafka-0-10_2.12-3.0.1.jar to make it work. Then
>>>>> I enquired the availability of new version from Maven that pointed to
>>>>> *spark-sql-kafka-0-10_2.12-3.1.1.jar*
>>>>>
>>>>> So to confirm Spark out of the tarball does not have any
>>>>>
>>>>> ltr spark-sql-kafka-*
>>>>> ls: cannot access spark-sql-kafka-*: No such file or directory
>>>>>
>>>>>
>>>>> For SSS, I had to add these
>>>>>
>>>>> add commons-pool2-2.9.0.jar. The one shipped is
>>>>>  commons-pool-1.5.4.jar!
>>>>>
>>>>> add kafka-clients-2.7.0.jar  Did not have any
>>>>>
>>>>> add  spark-sql-kafka-0-10_2.12-3.0.1.jar  Did not have any
>>>>>
>>>>> I gather from your second mail, there seems to be an issue with
>>>>> spark-sql-kafka-0-10_2.12-3.*1*.1.jar ?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 6 Apr 2021 at 11:54, Gabor Somogyi 
>>>>> wrote:
>>>>>
>>>>>> Since you've not shared too much details I presume you've updated the 
>>>>>> spark-sql-kafka
>>>>>> jar only.
>>>>>> KafkaTokenUtil is in the token provider jar.
>>>>>>
>>>>>> As a general note if I'm right, please update Spark as a whole on all
>>>>>> nodes and not just jars independently.
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 6, 2021 at 10:21 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>> Any chance of someone testing  the latest 
>>>>>>> spark-sql-kafka-0-10_2.12-3.1.1.jar
>>>>>>> for Spark. It throws
>>>>>>>
>>>>>>>
>>>>>>> java.lang.NoSuchMethodError:
>>>>>>> org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z
>>>>>>>
>>>>>>>
>>>>>>> However, the previous version spark-sql-kafka-0-10_2.12-3.0.1.jar
>>>>>>> works fine
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>


Re: Tuning spark job to make count faster.

2021-04-06 Thread Sean Owen
Hard to say without a lot more info, but 76.5K tasks is very large. How big
are the tasks / how long do they take? if very short, you should
repartition down.
Do you end up with 800 executors? if so why 2 per machine? that generally
is a loss at this scale of worker. I'm confused because you have 4000 tasks
running, which would be just 10 per executor as well.
What is the data input format? it's far faster to 'count' parquet as it's
just a metadata read.
Is anything else happening besides count() after the data is read?


On Tue, Apr 6, 2021 at 2:00 AM Krishna Chakka 
wrote:

> Hi,
>
> I am working on a spark job. It takes 10 mins for the job just for the
> count() function. * Question is How can I make it faster ?*
>
> From the above image, what I understood is that there 4001 tasks are
> running in parallel. Total tasks are 76,553 .
>
> Here are the parameters that I am using for the job
> - master machine type - e2-standard-16
> - worker machine type - e2-standard-8 (8 vcpus, 32 GB memory)
> - number of workers - 400
> - spark.executor.cores - 4
> - spark.executor.memory - 11g
> - spark.sql.shuffle.partitions - 1
>
>
> Please advice how can I make this faster ?
>
> Thanks
>
>
>
>
>
>
>
>
>
>
>
>


Re: jar incompatibility with Spark 3.1.1 for structured streaming with kafka

2021-04-06 Thread Sean Owen
You may be compiling your app against 3.0.1 JARs but submitting to 3.1.1.
You do not in general modify the Spark libs. You need to package libs like
this with your app at the correct version.

On Tue, Apr 6, 2021 at 6:42 AM Mich Talebzadeh 
wrote:

> Thanks Gabor.
>
> All nodes are running Spark /spark-3.1.1-bin-hadoop3.2
>
> So $SPARK_HOME/jars contains all the required jars on all nodes including
> the jar file commons-pool2-2.9.0.jar as well.
>
> They are installed identically on all nodes.
>
> I have looked at the Spark environment for classpath. Still I don't see
> the reason why Spark 3.1.1 fails with spark-sql-kafka-0-10_2.12-3.1.1.jar
> but works ok with  spark-sql-kafka-0-10_2.12-3.1.0.jar
>
> Anyway I unzipped the tarball for Spark-3.1.1 and there is
> no spark-sql-kafka-0-10_2.12-3.0.1.jar even
>
> I had to add spark-sql-kafka-0-10_2.12-3.0.1.jar to make it work. Then I
> enquired the availability of new version from Maven that pointed to
> *spark-sql-kafka-0-10_2.12-3.1.1.jar*
>
> So to confirm Spark out of the tarball does not have any
>
> ltr spark-sql-kafka-*
> ls: cannot access spark-sql-kafka-*: No such file or directory
>
>
> For SSS, I had to add these
>
> add commons-pool2-2.9.0.jar. The one shipped is commons-pool-1.5.4.jar!
>
> add kafka-clients-2.7.0.jar  Did not have any
>
> add  spark-sql-kafka-0-10_2.12-3.0.1.jar  Did not have any
>
> I gather from your second mail, there seems to be an issue with
> spark-sql-kafka-0-10_2.12-3.*1*.1.jar ?
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 6 Apr 2021 at 11:54, Gabor Somogyi 
> wrote:
>
>> Since you've not shared too much details I presume you've updated the 
>> spark-sql-kafka
>> jar only.
>> KafkaTokenUtil is in the token provider jar.
>>
>> As a general note if I'm right, please update Spark as a whole on all
>> nodes and not just jars independently.
>>
>> BR,
>> G
>>
>>
>> On Tue, Apr 6, 2021 at 10:21 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> Any chance of someone testing  the latest 
>>> spark-sql-kafka-0-10_2.12-3.1.1.jar
>>> for Spark. It throws
>>>
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z
>>>
>>>
>>> However, the previous version spark-sql-kafka-0-10_2.12-3.0.1.jar works
>>> fine
>>>
>>>
>>> Thanks
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: FW: Email to Spark Org please

2021-04-01 Thread Sean Owen
Yes that's a great option when the modeling process itself doesn't really
need Spark. You can use any old modeling tool you want and get the
parallelism in tuning via hyperopt's Spark integration.

On Thu, Apr 1, 2021 at 10:50 AM Williams, David (Risk Value Stream)
 wrote:

> Classification: Public
>
>
>
> Many thanks for the info.  So you wouldn’t use sklearn with Spark for
> large datasets but use it with smaller datasets and using hyperopt to build
> models in parallel for hypertuning on Spark?
>
>
>


Re: Error Message Suggestion

2021-03-29 Thread Sean Owen
Sure, just open a pull request?

On Mon, Mar 29, 2021 at 10:37 AM Josh Herzberg  wrote:

> Hi,
>
> I'd like to suggest this change to the PySpark code. I haven't contributed
> before so https://spark.apache.org/contributing.html suggested emailing
> here first.
>
> In the error raised here
> https://github.com/apache/spark/blob/b2bfe985e8adf55e5df5887340fd862776033a06/python/pyspark/worker.py#L141,
> the columns returned can be difficult to identify. The error would be more
> helpful and clear if the columns returned were included in the error
> message like so,
> [image: image.png]
> Happy to help make this happen if I can. Thanks!
>
> Josh
>


Re: Spark Views Functioning

2021-03-26 Thread Sean Owen
Views are simply bookkeeping about how the query is executed, like a
DataFrame. There is no data or result to store; it's just how to run a
query. The views exist on the driver. The query executes like any other, on
the cluster.

On Fri, Mar 26, 2021 at 3:38 AM Mich Talebzadeh 
wrote:

>
> As a first guess, where do you think this view is created in a distributed
> environment?
>
> The whole purpose is fast access to this temporary storage (shared among
> executors in this job) and that storage is only materialised after an
> action is performed.
>
> scala> val sales = spark.read.format("jdbc").options(
>  |Map("url" -> _ORACLEserver,
>  |"dbtable" -> "(SELECT * FROM sh.sales)",
>  |"user" -> _username,
>  |"password" -> _password)).load
> sales: org.apache.spark.sql.DataFrame = [PROD_ID: decimal(38,10), CUST_ID:
> decimal(38,10) ... 5 more fields]
>
> scala> sales.createOrReplaceTempView("sales")
>
> scala> spark.sql("select count(1) from sales").show
> ++
> |count(1)|
> ++
> |  918843|
> ++
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 26 Mar 2021 at 06:55, Kushagra Deep 
> wrote:
>
>> Hi all,
>>
>> I just wanted to know that when we create a 'createOrReplaceTempView' on
>> a spark dataset, where does the view reside ? Does all the data come to
>> driver and the view is created ? Or individual executors have part of the
>> views (based on the data each executor has) with them , so that when we
>> query a view, the query runs on each part of data that is there in every
>> executor?
>>
>>
>>
>> Get Outlook for Android 
>>
>


Re: FW: Email to Spark Org please

2021-03-26 Thread Sean Owen
Right, could also be the case that the overhead of distributing it is just
dominating.
You wouldn't use sklearn with Spark, just use sklearn at this scale.

What you _can_ use Spark for easily in this case is to distribute parameter
tuning with something like hyperopt. If you're building hundreds of models,
those can build in parallel with sklearn, and then use Spark to drive the
model builds in parallel as part of a process to tune the hyperparams.

On Fri, Mar 26, 2021 at 8:43 AM Williams, David (Risk Value Stream)
 wrote:

> Classification: Public
>
>
>
> Thanks again Sean.
>
>
>
> We did try increasing the partitions but to no avail.  Maybe it's because
> of the low dataset volumes as you say so the overhead is the bottleneck.
>
>
>
> If we use sklearn in Spark, we have to make some changes to utilize the
> distributed cluster. So if we get that working in distributed, will we get
> benefits similar to spark ML?
>
>
>
> Best Regards,
>
> Dave Williams
>
>
>
> *From:* Sean Owen 
> *Sent:* 26 March 2021 13:20
> *To:* Williams, David (Risk Value Stream)
> 
> *Cc:* user@spark.apache.org
> *Subject:* Re: FW: Email to Spark Org please
>
>
>
>
> *-- This email has reached the Bank via an external source -- *
>
> Simply because the data set is so small. Anything that's operating
> entirely in memory is faster than something splitting the same data across
> multiple machines, running multiple processes, and incurring all the
> overhead of sending the data and results, combining them, etc.
>
>
>
> That said, I suspect that you are not using any parallelism in Spark
> either. You probably have 1 partition, which means at most 1 core is used
> no matter how many are there. Repartition the data set.
>
>
>
> On Fri, Mar 26, 2021 at 8:15 AM Williams, David (Risk Value Stream) <
> david.willi...@lloydsbanking.com.invalid> wrote:
>
> Classification: Limited
>
>
>
> Many thanks for your response Sean.
>
>
>
> Question - why spark is overkill for this and why is sklearn is faster
> please?  It’s the same algorithm right?
>
>
>
> Thanks again,
>
> Dave Williams
>
>
>
> *From:* Sean Owen 
> *Sent:* 25 March 2021 16:40
> *To:* Williams, David (Risk Value Stream) <
> david.willi...@lloydsbanking.com>
> *Cc:* user@spark.apache.org
> *Subject:* Re: FW: Email to Spark Org please
>
>
>
>
> *-- This email has reached the Bank via an external source -- *
>
> Spark is overkill for this problem; use sklearn.
>
> But I'd suspect that you are using just 1 partition for such a small data
> set, and get no parallelism from Spark.
>
> repartition your input to many more partitions, but, it's unlikely to get
> much faster than in-core sklearn for this task.
>
>
>
> On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream) <
> david.willi...@lloydsbanking.com.invalid> wrote:
>
> Classification: Public
>
>
>
> Hi Team,
>
>
>
> We are trying to utilize ML Gradient Boosting Tree Classification
> algorithm and found the performance of the algorithm is very poor during
> training.
>
>
>
> We would like to see we can improve the performance timings since, it is
> taking 2 days for training for a smaller dataset.
>
>
>
> Our dataset size is 4. Number of features used for training is 564.
>
>
>
> The same dataset when we use in Sklearn python training is completed in 3
> hours but when used ML Gradient Boosting it is taking 2 days.
>
>
>
> We tried increasing number of executors, executor cores, driver memory etc
> but couldn’t see any improvements.
>
>
>
> The following are the parameters used for training.
>
>
>
> gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag',
> predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01,
> subsamplingRate=0.5, minInstancesPerNode=110)
>
>
>
> If you could help us with any suggestions to tune this,  that will be
> really helpful
>
>
>
> Many thanks,
>
> Dave Williams
>
> Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC95000. Telephone: 0131 225 4555.
>
> Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN.
> Registered in England and Wales no. 2065. Telephone 0207626 1500.
>
> Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC327000. Telephone: 03457 801 801.
>
> Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street,
> London EC2V 7HN. Registered in England and Wales no. 10399850.
>
> Scottish Widows Schroder Pe

Re: FW: Email to Spark Org please

2021-03-26 Thread Sean Owen
Simply because the data set is so small. Anything that's operating entirely
in memory is faster than something splitting the same data across multiple
machines, running multiple processes, and incurring all the overhead of
sending the data and results, combining them, etc.

That said, I suspect that you are not using any parallelism in Spark
either. You probably have 1 partition, which means at most 1 core is used
no matter how many are there. Repartition the data set.

On Fri, Mar 26, 2021 at 8:15 AM Williams, David (Risk Value Stream)
 wrote:

> Classification: Limited
>
>
>
> Many thanks for your response Sean.
>
>
>
> Question - why spark is overkill for this and why is sklearn is faster
> please?  It’s the same algorithm right?
>
>
>
> Thanks again,
>
> Dave Williams
>
>
>
> *From:* Sean Owen 
> *Sent:* 25 March 2021 16:40
> *To:* Williams, David (Risk Value Stream) <
> david.willi...@lloydsbanking.com>
> *Cc:* user@spark.apache.org
> *Subject:* Re: FW: Email to Spark Org please
>
>
>
>
> *-- This email has reached the Bank via an external source -- *
>
> Spark is overkill for this problem; use sklearn.
>
> But I'd suspect that you are using just 1 partition for such a small data
> set, and get no parallelism from Spark.
>
> repartition your input to many more partitions, but, it's unlikely to get
> much faster than in-core sklearn for this task.
>
>
>
> On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream) <
> david.willi...@lloydsbanking.com.invalid> wrote:
>
> Classification: Public
>
>
>
> Hi Team,
>
>
>
> We are trying to utilize ML Gradient Boosting Tree Classification
> algorithm and found the performance of the algorithm is very poor during
> training.
>
>
>
> We would like to see we can improve the performance timings since, it is
> taking 2 days for training for a smaller dataset.
>
>
>
> Our dataset size is 4. Number of features used for training is 564.
>
>
>
> The same dataset when we use in Sklearn python training is completed in 3
> hours but when used ML Gradient Boosting it is taking 2 days.
>
>
>
> We tried increasing number of executors, executor cores, driver memory etc
> but couldn’t see any improvements.
>
>
>
> The following are the parameters used for training.
>
>
>
> gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag',
> predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01,
> subsamplingRate=0.5, minInstancesPerNode=110)
>
>
>
> If you could help us with any suggestions to tune this,  that will be
> really helpful
>
>
>
> Many thanks,
>
> Dave Williams
>
> Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC95000. Telephone: 0131 225 4555.
>
> Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN.
> Registered in England and Wales no. 2065. Telephone 0207626 1500.
>
> Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC327000. Telephone: 03457 801 801.
>
> Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street,
> London EC2V 7HN. Registered in England and Wales no. 10399850.
>
> Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25
> Gresham Street, London EC2V 7HN. Registered in England and Wales no.
> 11722983.
>
> Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets
> plc are authorised by the Prudential Regulation Authority and regulated by
> the Financial Conduct Authority and Prudential Regulation Authority.
>
> Scottish Widows Schroder Personal Wealth Limited is authorised and
> regulated by the Financial Conduct Authority.
>
> Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned
> subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate
> Markets Wertpapierhandelsbank GmbH has its registered office at
> Thurn-und-Taxis Platz 6, 60313 Frankfurt, Germany. The company is
> registered with the Amtsgericht Frankfurt am Main, HRB 111650. Lloyds Bank
> Corporate Markets Wertpapierhandelsbank GmbH is supervised by the
> Bundesanstalt für Finanzdienstleistungsaufsicht.
>
> Halifax is a division of Bank of Scotland plc.
>
> HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in
> Scotland no. SC218813.
>
> This e-mail (including any attachments) is private and confidential and
> may contain privileged material. If you have received this e-mail in error,
> please notify the sender and delete it (including any attachments)
> immediately. You must not copy, distribute, disclose or use any of the
> information in it or any attachments. Telephone calls may be monitored or
> recorded.
>


Re: convert java dataframe to pyspark dataframe

2021-03-26 Thread Sean Owen
The problem is that both of these are not sharing a SparkContext as far as
I can see, so there is no way to share the object across them, let alone
languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two
languages and register the DataFrame as a temp view in Java, then access it
in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh 
wrote:

> Hi All,
>
> I am a newbie to spark and trying to pass a java dataframe to pyspark.
> Foloowing link has details about what I am trying to do:-
>
>
> https://stackoverflow.com/questions/66797382/creating-pysparks-spark-context-py4j-java-gateway-object
>
> Can someone please help me with this?
>
> Thanks,
>


Re: FW: Email to Spark Org please

2021-03-25 Thread Sean Owen
Spark is overkill for this problem; use sklearn.
But I'd suspect that you are using just 1 partition for such a small data
set, and get no parallelism from Spark.
repartition your input to many more partitions, but, it's unlikely to get
much faster than in-core sklearn for this task.

On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream)
 wrote:

> Classification: Public
>
>
>
> Hi Team,
>
>
>
> We are trying to utilize ML Gradient Boosting Tree Classification
> algorithm and found the performance of the algorithm is very poor during
> training.
>
>
>
> We would like to see we can improve the performance timings since, it is
> taking 2 days for training for a smaller dataset.
>
>
>
> Our dataset size is 4. Number of features used for training is 564.
>
>
>
> The same dataset when we use in Sklearn python training is completed in 3
> hours but when used ML Gradient Boosting it is taking 2 days.
>
>
>
> We tried increasing number of executors, executor cores, driver memory etc
> but couldn’t see any improvements.
>
>
>
> The following are the parameters used for training.
>
>
>
> gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag',
> predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01,
> subsamplingRate=0.5, minInstancesPerNode=110)
>
>
>
> If you could help us with any suggestions to tune this,  that will be
> really helpful
>
>
>
> Many thanks,
>
> Dave Williams
>
>
>
> Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC95000. Telephone: 0131 225 4555.
>
> Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN.
> Registered in England and Wales no. 2065. Telephone 0207626 1500.
>
> Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ.
> Registered in Scotland no. SC327000. Telephone: 03457 801 801.
>
> Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street,
> London EC2V 7HN. Registered in England and Wales no. 10399850.
>
> Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25
> Gresham Street, London EC2V 7HN. Registered in England and Wales no.
> 11722983.
>
> Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets
> plc are authorised by the Prudential Regulation Authority and regulated by
> the Financial Conduct Authority and Prudential Regulation Authority.
>
> Scottish Widows Schroder Personal Wealth Limited is authorised and
> regulated by the Financial Conduct Authority.
>
> Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned
> subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate
> Markets Wertpapierhandelsbank GmbH has its registered office at
> Thurn-und-Taxis Platz 6, 60313 Frankfurt, Germany. The company is
> registered with the Amtsgericht Frankfurt am Main, HRB 111650. Lloyds Bank
> Corporate Markets Wertpapierhandelsbank GmbH is supervised by the
> Bundesanstalt für Finanzdienstleistungsaufsicht.
>
> Halifax is a division of Bank of Scotland plc.
>
> HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in
> Scotland no. SC218813.
>
> This e-mail (including any attachments) is private and confidential and
> may contain privileged material. If you have received this e-mail in error,
> please notify the sender and delete it (including any attachments)
> immediately. You must not copy, distribute, disclose or use any of the
> information in it or any attachments. Telephone calls may be monitored or
> recorded.
>


Re: Rdd - zip with index

2021-03-24 Thread Sean Owen
Right, that's all you do to tell it to treat the first line of the files as
a header defining col names.
Yes, .gz files still aren't splittable by nature. One huge CSV .csv file
would be split into partitions, but one .gz file would not, which can be a
problem.
To be clear, you do not need to do anything to let Spark read parts of a
large file in parallel (assuming compression isn't the issue).

On Wed, Mar 24, 2021 at 11:00 AM Mich Talebzadeh 
wrote:

> How does Spark establish there is a csv header as a matter of interest?
>
> Example
>
> val df = spark.read.option("header", true).csv(location)
>
> I need to tell spark to ignore the header correct?
>
> From Spark Read CSV file into DataFrame — SparkByExamples
> <https://sparkbyexamples.com/spark/spark-read-csv-file-into-dataframe/>
>
> If you have a header with column names on file, you need to explicitly
> specify true for header option using option("header",true)
> <https://sparkbyexamples.com/spark/spark-read-csv-file-into-dataframe/#header>
>  not
> mentioning this, the API treats header as a data record.
>
> Second point which may not be applicable to the newer versions of Spark. My
> understanding is that the gz file is not splittable, therefore Spark needs
> to read the whole file using a single core which will slow things down (CPU
> intensive). After the read is done the data can be shuffled to increase
> parallelism.
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 24 Mar 2021 at 12:40, Sean Owen  wrote:
>
>> No need to do that. Reading the header with Spark automatically is
>> trivial.
>>
>> On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> If it is a csv then it is a flat file somewhere in a directory I guess.
>>>
>>> Get the header out by doing
>>>
>>> */usr/bin/zcat csvfile.gz |head -n 1*
>>> Title Number,Tenure,Property
>>> Address,District,County,Region,Postcode,Multiple Address Indicator,Price
>>> Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship
>>> Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor
>>> (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company
>>> Registration No. (2),Proprietorship Category (2),Country Incorporated
>>> (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2)
>>> Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship
>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor
>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company
>>> Registration No. (4),Proprietorship Category (4),Country Incorporated
>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4)
>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator
>>>
>>>
>>> 10GB is not much of a big CSV file
>>>
>>> that will resolve the header anyway.
>>>
>>>
>>> Also how are you running the spark, in a local mode (single jvm) or
>>> other distributed modes (yarn, standalone) ?
>>>
>>>
>>> HTH
>>>
>>


Re: Rdd - zip with index

2021-03-24 Thread Sean Owen
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh 
wrote:

> If it is a csv then it is a flat file somewhere in a directory I guess.
>
> Get the header out by doing
>
> */usr/bin/zcat csvfile.gz |head -n 1*
> Title Number,Tenure,Property
> Address,District,County,Region,Postcode,Multiple Address Indicator,Price
> Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship
> Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor
> (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company
> Registration No. (2),Proprietorship Category (2),Country Incorporated
> (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2)
> Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship
> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor
> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company
> Registration No. (4),Proprietorship Category (4),Country Incorporated
> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4)
> Address (3),Date Proprietor Added,Additional Proprietor Indicator
>
>
> 10GB is not much of a big CSV file
>
> that will resolve the header anyway.
>
>
> Also how are you running the spark, in a local mode (single jvm) or
> other distributed modes (yarn, standalone) ?
>
>
> HTH
>


Re: Rdd - zip with index

2021-03-23 Thread Sean Owen
It would split 10GB of CSV into multiple partitions by default, unless it's
gzipped. Something else is going on here.

‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com> wrote:‬

> I’m not Spark core developer and do not want to confuse you but it seems
> logical to me that just reading from single file (no matter what format of
> the file is used) gives no parallelism unless you do repartition by some
> column just after csv load, but the if you’re telling you’ve already tried
> repartition with no luck...
>
>
> > On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed 
> wrote:
> >
> > So spark by default doesn’t split the large 10gb file when loaded?
> >
> > Sent from my iPhone
> >
> >> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <
> yur...@gmail.com> wrote:
> >>
> >> Hi, Mohammed
> >> I think that the reason that only one executor is running and have
> single partition is because you have single file that might be read/loaded
> into memory.
> >>
> >> In order to achieve better parallelism I’d suggest to split the csv
> file.
> >>
>
>


Re: Rdd - zip with index

2021-03-23 Thread Sean Owen
I don't think that would change partitioning? try .repartition(). It isn't
necessary to write it out let alone in Avro.

‪On Tue, Mar 23, 2021 at 8:45 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com> wrote:‬

> Hi, Mohammed
> I think that the reason that only one executor is running and have single
> partition is because you have single file that might be read/loaded into
> memory.
>
> In order to achieve better parallelism I’d suggest to split the csv file.
>
> Another problem is question: why are you using rdd?
> Just Spark.read.option(“header”,
> true).load()..select().write.format(“avro”).save(...)
>
>
> > On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed 
> wrote:
> >
> > Hi,
> >
> > I have 10gb file that should be loaded into spark dataframe. This file
> is csv with header and we were using rdd.zipwithindex to get column names
> and convert to avro accordingly.
> >
> > I am assuming this is taking long time and only executor runs and never
> achieves parallelism. Is there a easy way to achieve parallelism after
> filtering out the header.
> >
> > I am
> > Also interested in solution that can remove header from the file and I
> can give my own schema. This way I can split the files.
> >
> > Rdd.partitions is always 1 for this even after repartitioning the
> dataframe after zip with index . Any help on this topic please .
> >
> > Thanks,
> > Asmath
> >
> > Sent from my iPhone
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Repartition or Coalesce not working

2021-03-22 Thread Sean Owen
You need to do something with the result of repartition. You haven't
changed textDF

On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed 
wrote:

> Hi,
>
> I have a use case where there are large files in hdfs.
>
> Size of the file is 3 GB.
>
> It is an existing code in production and I am trying to improve the
> performance of the job.
>
> Sample Code:
> textDF=dataframe ( This is dataframe that got created from hdfs path)
> logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
> --> Prints 1
> textDF.repartition(100)
> logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
> --> Prints 1
>
> Any suggestions  on why this is happening?
>
> Next Block of the code which takes time:
> rdd.filter(lambda line: len(line)!=collistlenth)
>
> any way to parallelize and speed up my process on this?
>
> Thanks,
> Asmath
>


Re: Spark version verification

2021-03-21 Thread Sean Owen
I believe you can "SELECT version()" in Spark SQL to see the build version.

On Sun, Mar 21, 2021 at 4:41 AM Mich Talebzadeh 
wrote:

> Thanks for the detailed info.
>
> I was hoping that one can find a simpler answer to the Spark version than
> doing forensic examination on base code so to speak.
>
> The primer for this verification is that on GCP dataprocs originally built
> on 3.11-rc2, there was an issue with running Spark Structured Streaming
> (SSS) which I reported to this forum before.
>
> After a while and me reporting to Google, they have now upgraded the base
> to Spark 3.1.1 itself. I am not privy to how they did the upgrade itself.
>
> In the meantime we installed 3.1.1 on-premise and ran it with the same
> Python code for SSS. It worked fine.
>
> However, when I run the same code on GCP dataproc upgraded to 3.1.1,
> occasionally I see this error
>
> 21/03/18 16:53:38 ERROR org.apache.spark.scheduler.AsyncEventQueue:
> Listener EventLoggingListener threw an exception
>
> java.util.ConcurrentModificationException
>
> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
>
> This may be for other reasons or the consequence of upgrading from
> 3.1.1-rc2 to 3.11?
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Mar 2021 at 22:41, Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> I would check out the Spark source then diff those two RCs (first just
>> take look to the list of the changed files):
>>
>> $ git diff v3.1.1-rc1..v3.1.1-rc2 --stat
>> ...
>>
>> The shell scripts in the release can be checked very easily:
>>
>> $ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".sh "
>>  bin/docker-image-tool.sh   |   6 +-
>>  dev/create-release/release-build.sh|   2 +-
>>
>> We are lucky as *docker-image-tool.sh* is part of the released version.
>> Is it from v3.1.1-rc2 or v3.1.1-rc1?
>>
>> Of course this only works if docker-image-tool.sh is not changed from
>> the v3.1.1-rc2 back to v3.1.1-rc1.
>> So let's continue with the python (and latter with R) files:
>>
>> $ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".py "
>>  python/pyspark/sql/avro/functions.py   |   4 +-
>>  python/pyspark/sql/dataframe.py|   1 +
>>  python/pyspark/sql/functions.py| 285 +--
>>  .../pyspark/sql/tests/test_pandas_cogrouped_map.py |  12 +
>>  python/pyspark/sql/tests/test_pandas_map.py|   8 +
>> ...
>>
>> After you have enough proof you can stop (to decide what is enough here
>> should be decided by you).
>> Finally you can use javap / scalap on the classes from the jars and check
>> some code changes which is more harder to be analyzed than a simple text
>> file.
>>
>> Best Regards,
>> Attila
>>
>>
>> On Thu, Mar 18, 2021 at 4:09 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> What would be a signature in Spark version or binaries that confirms the
>>> release is built on Spark built on 3.1.1 as opposed to 3.1.1-RC-1 or RC-2?
>>>
>>> Thanks
>>>
>>> Mich
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Sean Owen
That looks like you didn't compile with Java 11 actually. How did you try
to do so?

On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja 
wrote:

> HI All,
>
> We have compiled spark with java 11 ("11.0.9.1") and when testing the
> thrift
> server we are seeing that insert query from operator using beeline failing
> with the below error.
>
> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
> "timezone":"UTC", "log":"Uncaught exception in thread
> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
> DatanodeInfoWithStorage[10.75.47.158:1044
> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
> sun.nio.ch.DirectBuffer.cleaner()'
> at
>
> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
> ~[hadoop-common-2.10.1.jar:?]
> at
>
> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
> ~[hadoop-common-2.10.1.jar:?]
> at
>
> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
> ~[hadoop-common-2.10.1.jar:?]
> at java.io.FilterInputStream.close(FilterInputStream.java:180)
> ~[?:?]
> at
> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> at
> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
> "timezone":"UTC", "log":"unwrapping token of length:54"}
> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>
> Any inputs on how to fix this issue would be helpful for us.
>
> Thanks and Regards,
> kaki mahesh raja
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Sounds like Structured streaming with foreach, can only run on one executor

2021-03-09 Thread Sean Owen
That should not be the case. See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
Maybe you are calling .foreach on some Scala object inadvertently.

On Tue, Mar 9, 2021 at 4:41 PM Mich Talebzadeh 
wrote:

> Hi,
>
> When I use *foreachBatch *is Spark structured streaming, yarn mode works
> fine.
>
> When one switches to *foreach* mode (row by row processing), this
> effectively runs in local mode on a single JVM. It seems to crash when
> running in a distributed mode. That is my experience.
>
> Can someone else please verify this independently?
>
> Cheers
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread Sean Owen
You can also group by the key in the transformation on each batch. But yes
that's faster/easier if it's already partitioned that way.

On Tue, Mar 9, 2021 at 7:30 AM Ali Gouta  wrote:

> Do not know Kenesis, but it looks like it works like kafka. Your producer
> should implement a paritionner that makes it possible to send your data
> with the same key to the same partition. Though, each task in your spark
> streaming app will load data from the same partition in the same executor.
> I think this is the simplest way to achieve what you want to do.
>
> Best regards,
> Ali Gouta.
>
> On Tue, Mar 9, 2021 at 11:30 AM forece85  wrote:
>
>> We are doing batch processing using Spark Streaming with Kinesis with a
>> batch
>> size of 5 mins. We want to send all events with same eventId to same
>> executor for a batch so that we can do multiple events based grouping
>> operations based on eventId. No previous batch or future batch data is
>> concerned. Only Current batch keyed operation needed.
>>
>> Please help me how to achieve this.
>>
>> Thanks.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Creating spark context outside of the driver throws error

2021-03-08 Thread Sean Owen
Yep, you can never use Spark inside Spark.
You could run N jobs in parallel from the driver using Spark, however.

On Mon, Mar 8, 2021 at 3:14 PM Mich Talebzadeh 
wrote:

>
> In structured streaming with pySpark, I need to do some work on the row
> *foreach(process_row)*
>
> below
>
>
> *def process_row(row):*
>
> ticker = row['ticker']
>
> price = row['price']
>
> if ticker == 'IBM':
>
>   print(ticker, price)
>
>   # read data from BigQuery table for analysis
>
>   appName = config['common']['appName']
>
> *  spark_session = s.spark_session(appName)*
>
>  dfBatchRead = s.loadTableFromBQ(*spark_session)*
> ,config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>
> *class MDStreamingRow:*
>
> def __init__(self, spark_session,spark_context):
>
> self.spark = spark_session
>
> self.sc = spark_context
>
> self.config = config
>
>
> def fetch_data(self):
>
>writeStream. \
>
>  outputMode('append'). \
>
>  option("truncate", "false"). \
>
> * foreach(process_row). \*
>
>
> The issue I have is that spark-session is created at the driver (see
> below) and in order to load data from BigQuery table, I need to call
> spark_session in method *def process_row) as above*
>
>
> if __name__ == "__main__":
>
> appName = config['common']['appName']
>
> *spark_session = s.spark_session(appName)*
>
> mdstreaming = MDStreamingRow(spark_session, spark_context)
>
> However, I get this error when it gets to process_row()
>
>
> raise Exception("SparkContext should only be created and accessed on the
> driver.")
>
> Exception: SparkContext should only be created and accessed on the driver.
>
> FYI, the spark_session is defined as
>
> def spark_session(appName):
>   return SparkSession.builder \
> .appName(appName) \
> .enableHiveSupport() \
> .getOrCreate()
>
> Do I need to create SparkSessionSingleton etc?
>
> Thanks
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device\n\t

2021-03-08 Thread Sean Owen
It's there in the error: No space left on device
You ran out of disk space (local disk) on one of your machines.

On Mon, Mar 8, 2021 at 2:02 AM Sachit Murarka 
wrote:

> Hi All,
>
> I am getting the following error in my spark job.
>
> Can someone please have a look ?
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 41.0 (TID 80817, executor 193): com.esotericsoftware.kryo.KryoException:
> java.io.IOException: No space left on device\n\tat
> com.esotericsoftware.kryo.io.Output.flush(Output.java:188)\n\tat
> com.esotericsoftware.kryo.io.Output.require(Output.java:164)\n\tat
> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)\n\tat
> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)\n\tat
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)\n\tat
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)\n\tat
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)\n\tat
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)\n\tat
> org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)\n\tat
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)\n\tat
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)\n\tat
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n\tat
> org.apache.spark.scheduler.Task.run(Task.scala:123)\n\tat
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.IOException: No
> space left on device\n\tat java.io.FileOutputStream.writeBytes(Native
> Method)\n\tat
> java.io.FileOutputStream.write(FileOutputStream.java:326)\n\tat
> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)\n\tat
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)\n\tat
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)\n\tat
> net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)\n\tat
> com.esotericsoftware.kryo.io.Output.flush(Output.java:186)\n\t... 19
> more\n\nDriver stacktrace:\n\tat
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n\tat
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n\tat
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n\tat
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n\tat
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
> scala.Option.foreach(Option.scala:257)\n\tat
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n\tat
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n\tat
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n\tat
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n\tat
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n\tat
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n\tat
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n\tat
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n\tat
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n\tat
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n\tat
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat
> org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat
> org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n\tat
> org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)\n\tat
> 

Re: Possible upgrade path from Spark 3.1.1-RC2 to Spark 3.1.1 GA

2021-03-04 Thread Sean Owen
I think you're still asking about GCP and Dataproc, and that's really
nothing to do with Spark itself.
Whatever issues you are having concern Dataproc and how it's run and
possibly customizations in Dataproc.
3.1.1-RC2 is not a release, but, also nothing meaningfully changed between
it and the final 3.1.1 release. There is no need for any change to work
with 3.1.1.


On Thu, Mar 4, 2021 at 8:16 AM Mich Talebzadeh 
wrote:

> Hi,
>
> Is there any direct upgrade path from 3.1.1 RC-2 to 3.1.1 GA.
>
> If there is, will that involve replacing the Spark binaries?
>
> thanks,
>
> Mich
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: [Spark SQL, intermediate+] possible bug or weird behavior of insertInto

2021-03-03 Thread Sean Owen
I don't have any good answer here, but, I seem to recall that this is
because of SQL semantics, which follows column ordering not naming when
performing operations like this. It may well be as intended.

On Tue, Mar 2, 2021 at 6:10 AM Oldrich Vlasic <
oldrich.vla...@datasentics.com> wrote:

> Hi,
>
> I have encountered a weird and potentially dangerous behaviour of Spark
> concerning
> partial overwrites of partitioned data. Not sure if this is a bug or just
> abstraction
> leak. I have checked Spark section of Stack Overflow and haven't found any
> relevant
> questions or answers.
>
> Full minimal working example provided as attachment. Tested on Databricks
> runtime 7.3 LTS
> ML (Spark 3.0.1). Short summary:
>
> Write dataframe using partitioning by a column using saveAsTable. Filter
> out part of the
> dataframe, change some values (simulates new increment of data) and write
> again,
> overwriting a subset of partitions using insertInto. This operation will
> either fail on
> schema mismatch or cause data corruption.
>
> Reason: on the first write, the ordering of the columns is changed
> (partition column is
> placed at the end). On the second write this is not taken into
> consideration and Spark
> tries to insert values into the columns based on their order and not on
> their name. If
> they have different types this will fail. If not, values will be written
> to incorrect
> columns causing data corruption.
>
> My question: is this a bug or intended behaviour? Can something be done
> about it to prevent
> it? This issue can be avoided by doing a select with schema loaded from
> the target table.
> However, when user is not aware this could cause hard to track down errors
> in data.
>
> Best regards,
> Oldřich Vlašic
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Please update this notification on Spark download Site

2021-03-02 Thread Sean Owen
That statement is still accurate - it is saying the release will be 3.1.1,
not 3.1.0.
In any event, 3.1.1 is rolling out as we speak - already in Maven and
binaries are up and the website changes are being merged.

On Tue, Mar 2, 2021 at 9:10 AM Mich Talebzadeh 
wrote:

>
> Can someone please update the release date of 3.1.1 from
>
>
> Downloads | Apache Spark
> 
>
>
>
>- Next official release: Spark 3.1.1
> (Jan
>07, 2021)
>
> to
>
>
> Next official release: Spark 3.1.1
> 
> *(TBA)*
>
>
> Since there is no 3.1.1 officially released yet (discarding rc-1, rc-2
> release).
>
>
> Thanks
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark closures behavior in local mode in IDEs

2021-02-26 Thread Sean Owen
Yeah this is a good question. It is certainly to do with executing within
the same JVM, but even I'd have to dig into the code to explain why the
spark-sql version operates differently, as that also appears to be local.
To be clear this 'shouldn't' work, just happens to not fail in local
execution.

On Fri, Feb 26, 2021 at 10:40 AM Sheel Pancholi 
wrote:

>
> I am afraid that might at best be partially true. What would explain
> spark-shell in local mode also throwing the same error! It should hv run
> fine by that logic. In digging more, it was apparent why this was
> happening.
>
> When you run your code simply adding libraries to your code and running in
> local mode you are not essentially running a spark-submit... And your
> static/singleton "object" is available across tasks inside the same jvm.
> When you instead have that same code running from the instance of a class
> from your IDE in local mode you see the exact same serialization issue
> since it has to be distributed to the tasks.. Technically it still being
> within the same jvm should not directly require serialization but somehow
> the glow of control show spark libraries to see this as an issue. One can
> easily extrapolate and hypothesize (unless the creators spare some time
> answering on this thread) this idea to why then and therefore a singleton
> object is treated in a different way when using spark-submit in local mode
> possibly to bring homogeneity in the way "spark-submit" treats local and
> cluster mode as even a static/singleton might/will need to be moved to a
> different machine in a cluster mode bringing its case on par with a regular
> instance of a class.
>
>
> On Fri, Feb 26, 2021 at 9:07 PM Lalwani, Jayesh 
> wrote:
>
>> Yes, as you found, in local mode, Spark won’t serialize your objects. It
>> will just pass the reference to the closure. This means that it is possible
>> to write code that works in local mode, but doesn’t when you run
>> distributed.
>>
>>
>>
>> *From: *Sheel Pancholi 
>> *Date: *Friday, February 26, 2021 at 4:24 AM
>> *To: *user 
>> *Subject: *[EXTERNAL] Spark closures behavior in local mode in IDEs
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hi ,
>>
>>
>>
>> I am observing weird behavior of spark and closures in local mode on my
>> machine v/s a 3 node cluster (Spark 2.4.5).
>>
>> Following is the piece of code
>>
>> object Example {
>>
>>   val num=5
>>
>>   def myfunc={
>>
>>
>>
>>   sc.parallelize(1 to 4).map(_+num).foreach(println)
>>
>> }
>>
>> }
>>
>> I expected this to fail regardless since the local variable *num* is
>> needed in the closure and therefore *Example* object would need to be
>> serialized but it cannot be since it does not extend *Serializable*
>>  interface.
>>
>> · when I run the same piece of code from spark-shell on my same
>> local machine, it fails with the error given the rationale above: [image:
>> Image removed by sender. enter image description here]
>> 
>>
>> · When I run the same piece of code *in yarn mode* on a 3 node
>> EMR cluster, it fails with the exact same error as in the above
>> screenshot...given the same rationale as mentioned above.
>>
>> · when I run the same piece of code *in local mode* on a the
>> same cluster (=> master node), it also fails. The same rationale still
>> holds true.
>>
>> · However, this, when I run from an sbt project *(not a Spark
>> installation or anything...just added Spark libraries to my sbt project and
>> used a **conf.master(local[..])* in local mode runs fine and gives me an
>> o/p of 6,7,8,9: [image: Image removed by sender. enter image description
>> here] 
>>
>> This means its running fine everywhere except when you run it by adding
>> Spark dependencies in your sbt project. The question is what explains the
>> different local mode behavior when running your Spark code by simply adding
>> your Spark libraries in sbt project?
>>
>>
>>
>> Regards,
>>
>> Sheel
>>
>
>
> --
>
> Best Regards,
>
> Sheel Pancholi
>
> *Mob: +91 9620474620*
>
> *Connect with me on: Twitter  | LinkedIn
> *
>
> *Write to me at:Sheel@Yahoo!!  | Sheel@Gmail!!
>  | Sheel@Windows Live *
>
> P *Save a tree* - please do not print this email unless you really need
> to!
>


Re: Issue after change to 3.0.2

2021-02-26 Thread Sean Owen
That looks to me like you have two different versions of Spark in use
somewhere here. Like the cluster and driver versions aren't quite the same.
Check your classpaths?

On Fri, Feb 26, 2021 at 2:53 AM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi All,
>
>
>
> After changing to 3.0.2 I face the following issue. Thanks for any hint on
> that issue.
>
>
>
> Best,
>
> Meikel
>
>
>
>df = self.spark.read.json(path_in)
>
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
> 300, in json
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
>
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128,
> in deco
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
> 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o76.json.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 14, 192.168.1.6, executor 0): java.io.InvalidClassException:
> org.apache.spark.broadcast.TorrentBroadcast; local class incompatible:
> stream classdesc serialVersionUID = 4804550167553929379, local class
> serialVersionUID = 3291767831129286585
>
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:407)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> Driver stacktrace:
>
> at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
>
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
>
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
>
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
>
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
>
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
>
> at scala.Option.foreach(Option.scala:407)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
>
> at
> org.apache.spark.sql.catalyst.json.JsonInferSchema.infer(JsonInferSchema.scala:94)
>
> at
> org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$5(JsonDataSource.scala:110)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>
> at
> 

Re: A serious bug in the fitting of a binary logistic regression.

2021-02-22 Thread Sean Owen
I'll take a look. At a glance - is it converging? might turn down the
tolerance to check.
Also what does scikit learn say on the same data? we can continue on the
JIRA.

On Mon, Feb 22, 2021 at 5:42 PM Yakov Kerzhner  wrote:

> I have written up a JIRA, and there is a gist attached that has code that
> reproduces the issue.  This is a fairly serious issue as it probably
> affects everyone who uses spark to fit binary logistic regressions.
> https://issues.apache.org/jira/browse/SPARK-34448
> Would be great if someone who understands binary logistic regressions and
> the implementation in scala to take a look.
>


Re: spark 3.1.1 release date?

2021-02-20 Thread Sean Owen
Another RC is starting imminently, which looks pretty good. If it succeeds,
probably next week.
It will support Scala 2.12, but I believe a Scala 2.13 build is only coming
in 3.2.0.

On Sat, Feb 20, 2021 at 1:54 PM Bulldog20630405 
wrote:

>
> what is the expected ballpark release date of spark 3.1.1 ?
> will it be built with scala 2.13 ?
>


Re: Using Custom Scala Spark ML Estimator in PySpark

2021-02-16 Thread Sean Owen
You won't be able to use it in python if it is implemented in Java - needs
a python wrapper too.

On Mon, Feb 15, 2021, 11:29 PM HARSH TAKKAR  wrote:

> Hi ,
>
> I have created a custom Estimator in scala, which i can use successfully
> by creating a pipeline model in Java and scala, But when i try to load the
> pipeline model saved using scala api in pyspark, i am getting an error
> saying module not found.
>
> I have included my custom model jar in the class pass using "spark.jars"
>
> Can you please help, if i am missing something.
>
> Kind Regards
> Harsh Takkar
>


Re: vm.swappiness value for Spark on Kubernetes

2021-02-16 Thread Sean Owen
You probably don't want swapping in any environment. Some tasks will grind
to a halt under mem pressure rather than just fail quickly. You would want
to simply provision more memory.

On Tue, Feb 16, 2021, 7:57 AM Jahar Tyagi  wrote:

> Hi,
>
> We have recently migrated from Spark 2.4.4 to Spark 3.0.1 and using Spark
> in virtual machine/bare metal as standalone deployment and as kubernetes
> deployment as well.
>
> There is a kernel parameter named as 'vm.swappiness' and we keep its value
> as '1' in standard deployment. Now since we are moving to kubernetes and on
> kubernetes worker nodes the value of this parameter is '60'.
>
> Now my question is if it is OK to keep such a high value of
> 'vm.swappiness'=60 in kubernetes environment for Spark workloads.
>
> Will such high value of this kernel parameter have performance impact on
> Spark PODs?
> As per below link from cloudera, they suggest not to set such a high
> value.
>
>
> https://docs.cloudera.com/cloudera-manager/7.2.6/managing-clusters/topics/cm-setting-vmswappiness-linux-kernel-parameter.html
>
> Any thoughts/suggestions on this are highly appreciated.
>
> Regards
> Jahar Tyagi
>
>


<    1   2   3   4   5   6   7   8   9   10   >